Skip to content

Commit

Permalink
crates/{models,tables,agent}: refactor CatalogType, Capability, and R…
Browse files Browse the repository at this point in the history
…oleGrants

Push these down into the `models` and `tables` crates, with conditional sqlx support.

Add new routines for efficient evaluation of transitive role grants.
  • Loading branch information
jgraettinger committed Sep 5, 2024
1 parent 800fcf0 commit bf7ae34
Show file tree
Hide file tree
Showing 12 changed files with 339 additions and 143 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

85 changes: 3 additions & 82 deletions crates/agent-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ pub mod live_specs;
pub mod publications;
use serde::{Deserialize, Serialize};
use sqlx::types::Uuid;
use std::fmt::{self, Display};

pub use models::Id;

mod text_json;
pub use text_json::TextJson;

pub use models::{Capability, CatalogType, Id};
pub use tables::RoleGrant;

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "flow_type")]
#[sqlx(rename_all = "snake_case")]
Expand All @@ -38,85 +38,6 @@ impl From<CatalogType> for FlowType {
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
#[sqlx(type_name = "catalog_spec_type")]
#[sqlx(rename_all = "lowercase")]
pub enum CatalogType {
Capture,
Collection,
Materialization,
Test,
}

impl sqlx::postgres::PgHasArrayType for CatalogType {
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
sqlx::postgres::PgTypeInfo::with_name("_catalog_spec_type")
}
}

impl Display for CatalogType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let s = match *self {
CatalogType::Capture => "capture",
CatalogType::Collection => "collection",
CatalogType::Materialization => "materialization",
CatalogType::Test => "test",
};
f.write_str(s)
}
}

impl Into<models::CatalogType> for CatalogType {
fn into(self) -> models::CatalogType {
match self {
CatalogType::Capture => models::CatalogType::Capture,
CatalogType::Collection => models::CatalogType::Collection,
CatalogType::Materialization => models::CatalogType::Materialization,
CatalogType::Test => models::CatalogType::Test,
}
}
}

impl From<models::CatalogType> for CatalogType {
fn from(m: models::CatalogType) -> Self {
match m {
models::CatalogType::Capture => CatalogType::Capture,
models::CatalogType::Collection => CatalogType::Collection,
models::CatalogType::Materialization => CatalogType::Materialization,
models::CatalogType::Test => CatalogType::Test,
}
}
}

/// Note that the discriminants here align with those in the database type.
#[derive(
Debug,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
Serialize,
Deserialize,
sqlx::Type,
schemars::JsonSchema,
)]
#[sqlx(type_name = "grant_capability")]
#[sqlx(rename_all = "lowercase")]
#[serde(rename_all = "camelCase")]
pub enum Capability {
Read = 0x10,
Write = 0x20,
Admin = 0x30,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct RoleGrant {
pub subject_role: String,
pub object_role: String,
pub capability: Capability,
}

/// Returns the user ID for the given email address, or an error if the email address is not found.
pub async fn get_user_id_for_email(email: &str, db: &sqlx::PgPool) -> sqlx::Result<Uuid> {
sqlx::query_scalar!(
Expand Down
2 changes: 1 addition & 1 deletion crates/agent/src/api/create_data_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Manual {
pub struct Request {
/// Base name of this data-plane, such as "gcp-us-central1-c1".
#[validate]
name: models::PartitionField,
name: models::Token,

/// Private tenant to which this data-plane is provisioned,
/// or if None the data-plane is public.
Expand Down
3 changes: 1 addition & 2 deletions crates/agent/src/directives/beta_onboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ pub struct Directive {}
#[derive(Deserialize, Validate, schemars::JsonSchema)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct Claims {
// TODO(johnny): Introduce models::Tenant which, like PartitionField, also uses TOKEN_RE.
#[validate]
requested_tenant: models::PartitionField,
requested_tenant: models::Token,
// Survey results for the tenant.
// This is persisted in the DB but is not actually used by the agent.
#[allow(dead_code)]
Expand Down
10 changes: 4 additions & 6 deletions crates/agent/src/publications/specs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,11 +641,9 @@ pub async fn resolve_live_specs(
continue;
}
for source in reads_from {
if !spec_row
.spec_capabilities
.iter()
.any(|c| source.starts_with(&c.object_role) && c.capability >= Capability::Read)
{
if !spec_row.spec_capabilities.iter().any(|c| {
source.starts_with(c.object_role.as_str()) && c.capability >= Capability::Read
}) {
live.errors.push(tables::Error {
scope: scope.clone(),
error: anyhow::anyhow!(
Expand All @@ -657,7 +655,7 @@ pub async fn resolve_live_specs(
}
for target in writes_to {
if !spec_row.spec_capabilities.iter().any(|c| {
target.starts_with(&c.object_role)
target.starts_with(c.object_role.as_str())
&& matches!(c.capability, Capability::Write | Capability::Admin)
}) {
live.errors.push(tables::Error {
Expand Down
70 changes: 70 additions & 0 deletions crates/models/src/catalogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,37 @@ pub struct Catalog {
pub tests: BTreeMap<Test, TestDef>,
}

#[derive(Serialize, Deserialize, Clone, Copy, Debug, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
#[cfg_attr(
feature = "sqlx-support",
derive(sqlx::Type),
sqlx(type_name = "catalog_spec_type", rename_all = "lowercase")
)]
pub enum CatalogType {
Capture,
Collection,
Materialization,
Test,
}

/// Capability within the Estuary role-based access control (RBAC) authorization system.
#[derive(
Serialize, Deserialize, Clone, Copy, Debug, JsonSchema, PartialEq, Eq, PartialOrd, Ord, Hash,
)]
#[serde(rename_all = "lowercase")]
#[cfg_attr(
feature = "sqlx-support",
derive(sqlx::Type),
sqlx(type_name = "grant_capability", rename_all = "lowercase")
)]
pub enum Capability {
/// Note that the discriminants here align with those in the database type.
Read = 10,
Write = 20,
Admin = 30,
}

impl Catalog {
/// Build a root JSON schema for the Catalog model.
pub fn root_json_schema() -> schemars::schema::RootSchema {
Expand Down Expand Up @@ -199,3 +230,42 @@ fn tests_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::S
}))
.unwrap()
}

#[cfg(feature = "sqlx-support")]
impl sqlx::postgres::PgHasArrayType for CatalogType {
fn array_type_info() -> sqlx::postgres::PgTypeInfo {
sqlx::postgres::PgTypeInfo::with_name("_catalog_spec_type")
}
}

impl std::str::FromStr for CatalogType {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"capture" => Ok(CatalogType::Capture),
"collection" => Ok(CatalogType::Collection),
"materialization" => Ok(CatalogType::Materialization),
"test" => Ok(CatalogType::Test),
_ => Err(()),
}
}
}

impl std::fmt::Display for CatalogType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str(self.as_ref())
}
}

impl std::convert::AsRef<str> for CatalogType {
fn as_ref(&self) -> &str {
// These strings match what's used by serde, and also match the definitions in the database.
match *self {
CatalogType::Capture => "capture",
CatalogType::Collection => "collection",
CatalogType::Materialization => "materialization",
CatalogType::Test => "test",
}
}
}
53 changes: 5 additions & 48 deletions crates/models/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
pub mod collate;
use std::collections::BTreeSet;

mod captures;
mod catalogs;
pub mod collate;
mod collections;
mod connector;
mod derivation;
Expand All @@ -18,11 +19,9 @@ mod shards;
mod source;
mod tests;

use std::collections::BTreeSet;

pub use crate::labels::{Label, LabelSelector, LabelSet};
pub use captures::{AutoDiscover, CaptureBinding, CaptureDef, CaptureEndpoint};
pub use catalogs::Catalog;
pub use catalogs::{Capability, Catalog, CatalogType};
pub use collections::{CollectionDef, Projection};
pub use connector::{split_image_tag, ConnectorConfig, LocalConfig};
pub use derivation::{Derivation, DeriveUsing, Shuffle, ShuffleType, TransformDef};
Expand All @@ -40,11 +39,10 @@ pub use materializations::{
};
pub use raw_value::RawValue;
pub use references::{
Capture, Collection, CompositeKey, Field, JsonPointer, Materialization, PartitionField, Prefix,
RelativeUrl, StorageEndpoint, Test, Transform, CATALOG_PREFIX_RE, TOKEN_RE,
Capture, Collection, CompositeKey, Field, JsonPointer, Materialization, Name, PartitionField,
Prefix, RelativeUrl, StorageEndpoint, Test, Token, Transform, CATALOG_PREFIX_RE, TOKEN_RE,
};
pub use schemas::Schema;
use serde::{Deserialize, Serialize};
pub use shards::ShardTemplate;
pub use source::{FullSource, OnIncompatibleSchemaChange, PartitionSelector, Source};
pub use tests::{TestDef, TestDocuments, TestStep, TestStepIngest, TestStepVerify};
Expand Down Expand Up @@ -98,47 +96,6 @@ pub trait ModelDef:
}
}

#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CatalogType {
Capture,
Collection,
Materialization,
Test,
}

impl std::str::FromStr for CatalogType {
type Err = ();

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"capture" => Ok(CatalogType::Capture),
"collection" => Ok(CatalogType::Collection),
"materialization" => Ok(CatalogType::Materialization),
"test" => Ok(CatalogType::Test),
_ => Err(()),
}
}
}

impl std::fmt::Display for CatalogType {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str(self.as_ref())
}
}

impl std::convert::AsRef<str> for CatalogType {
fn as_ref(&self) -> &str {
// These strings match what's used by serde, and also match the definitions in the database.
match *self {
CatalogType::Capture => "capture",
CatalogType::Collection => "collection",
CatalogType::Materialization => "materialization",
CatalogType::Test => "test",
}
}
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum AnySpec {
Expand Down
6 changes: 6 additions & 0 deletions crates/models/src/references.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ macro_rules! string_reference_types {
// of passing it into another macro.

string_reference_types! {
/// Token is Unicode letters, numbers, '-', '_', or '.'.
pub struct Token("Token::schema", pattern = TOKEN_RE, example = "token");

/// Catalog Name is a series of tokens separated by a forward slash.
pub struct Name("Name::schema", pattern = CATALOG_NAME_RE, example = "acmeCo/name");

/// Collection names are paths of Unicode letters, numbers, '-', '_', or '.'.
/// Each path component is separated by a slash '/',
/// and a name may not begin or end in a '/'.
Expand Down
4 changes: 4 additions & 0 deletions crates/tables/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ serde_json = { workspace = true }
superslice = { workspace = true }
url = { workspace = true }

[dev-dependencies]

insta = { workspace = true }

[features]
default = []

Expand Down
Loading

0 comments on commit bf7ae34

Please sign in to comment.