Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: try adding FragmentRelation in meta store #20035

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/meta/model/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod m20241121_101830_table_engine;
mod m20241125_043732_connection_params;
mod m20241202_071413_resource_group;
mod m20241226_074013_clean_watermark_index_in_pk;
mod m20250106_072104_fragment_relation;
mod utils;

pub struct Migrator;
Expand Down Expand Up @@ -100,6 +101,7 @@ impl MigratorTrait for Migrator {
Box::new(m20241121_101830_table_engine::Migration),
Box::new(m20241202_071413_resource_group::Migration),
Box::new(m20241226_074013_clean_watermark_index_in_pk::Migration),
Box::new(m20250106_072104_fragment_relation::Migration),
]
}
}
199 changes: 199 additions & 0 deletions src/meta/model/migration/src/m20250106_072104_fragment_relation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use sea_orm::{FromJsonQueryResult, FromQueryResult, Statement};
use sea_orm_migration::prelude::*;
use serde::{Deserialize, Serialize};

use crate::drop_tables;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(FragmentRelation::Table)
.col(
ColumnDef::new(FragmentRelation::SourceFragmentId)
.integer()
.not_null(),
)
.col(
ColumnDef::new(FragmentRelation::TargetFragmentId)
.integer()
.not_null(),
)
.col(
ColumnDef::new(FragmentRelation::DispatcherType)
.string()
.not_null(),
)
.col(
ColumnDef::new(FragmentRelation::DistKeyIndices)
.json_binary()
.not_null(),
)
.col(
ColumnDef::new(FragmentRelation::OutputIndices)
.json_binary()
.not_null(),
)
.primary_key(
Index::create()
.col(FragmentRelation::SourceFragmentId)
.col(FragmentRelation::TargetFragmentId),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_fragment_relation_source_oid")
.from(FragmentRelation::Table, FragmentRelation::SourceFragmentId)
.to(Fragment::Table, Fragment::FragmentId)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.foreign_key(
&mut ForeignKey::create()
.name("FK_fragment_relation_target_oid")
.from(FragmentRelation::Table, FragmentRelation::TargetFragmentId)
.to(Fragment::Table, Fragment::FragmentId)
.on_delete(ForeignKeyAction::Cascade)
.to_owned(),
)
.to_owned(),
)
.await?;

fulfill_fragment_relation(manager).await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
drop_tables!(manager, FragmentRelation);
Ok(())
}
}

// Fulfill the FragmentRelation table with data from the Actor and ActorDispatcher tables
async fn fulfill_fragment_relation(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
let connection = manager.get_connection();

let database_backend = connection.get_database_backend();

let (sql, values) = Query::select()
.distinct_on([
FragmentRelation::SourceFragmentId,
FragmentRelation::TargetFragmentId,
])
.expr_as(
Expr::col((Actor::Table, Actor::FragmentId)),
FragmentRelation::SourceFragmentId,
)
.expr_as(
Expr::col((ActorDispatcher::Table, ActorDispatcher::DispatcherId)),
FragmentRelation::TargetFragmentId,
)
.columns([
(ActorDispatcher::Table, ActorDispatcher::DispatcherType),
(ActorDispatcher::Table, ActorDispatcher::DistKeyIndices),
(ActorDispatcher::Table, ActorDispatcher::OutputIndices),
])
.from(Actor::Table)
.join(
JoinType::InnerJoin,
ActorDispatcher::Table,
Expr::col((Actor::Table, Actor::ActorId))
.equals((ActorDispatcher::Table, ActorDispatcher::ActorId)),
)
.to_owned()
.build_any(&*database_backend.get_query_builder());

let rows = connection
.query_all(Statement::from_sql_and_values(
database_backend,
sql,
values,
))
.await?;

for row in rows {
let FragmentRelationEntity {
source_fragment_id,
target_fragment_id,
dispatcher_type,
dist_key_indices,
output_indices,
} = FragmentRelationEntity::from_query_result(&row, "")?;

manager
.exec_stmt(
Query::insert()
.into_table(FragmentRelation::Table)
.columns([
FragmentRelation::SourceFragmentId,
FragmentRelation::TargetFragmentId,
FragmentRelation::DispatcherType,
FragmentRelation::DistKeyIndices,
FragmentRelation::OutputIndices,
])
.values_panic([
source_fragment_id.into(),
target_fragment_id.into(),
dispatcher_type.into(),
dist_key_indices.into(),
output_indices.into(),
])
.to_owned(),
)
.await?;
}

Ok(())
}

#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct I32Array(pub Vec<i32>);

#[derive(Debug, FromQueryResult)]
#[sea_orm(entity = "FragmentRelation")]
pub struct FragmentRelationEntity {
source_fragment_id: i32,
target_fragment_id: i32,
dispatcher_type: String,
dist_key_indices: I32Array,
output_indices: I32Array,
}

#[derive(DeriveIden)]
enum FragmentRelation {
Table,
SourceFragmentId,
TargetFragmentId,
DispatcherType,
DistKeyIndices,
OutputIndices,
}

#[derive(DeriveIden)]
enum Fragment {
Table,
FragmentId,
}

#[derive(DeriveIden)]
enum Actor {
Table,
ActorId,
FragmentId,
}

#[derive(DeriveIden)]
enum ActorDispatcher {
Table,
ActorId,
DispatcherType,
DistKeyIndices,
OutputIndices,
DispatcherId,
}
53 changes: 53 additions & 0 deletions src/meta/model/src/fragment_relation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

use crate::actor_dispatcher::DispatcherType;
use crate::{FragmentId, I32Array};

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "fragment_relation")]
pub struct Model {
#[sea_orm(primary_key)]
pub source_fragment_id: FragmentId,
#[sea_orm(primary_key)]
pub target_fragment_id: FragmentId,
pub dispatcher_type: DispatcherType,
pub dist_key_indices: I32Array,
pub output_indices: I32Array,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::fragment::Entity",
from = "Column::SourceFragmentId",
to = "super::fragment::Column::FragmentId",
on_update = "NoAction",
on_delete = "Cascade"
)]
SourceFragment,
#[sea_orm(
belongs_to = "super::fragment::Entity",
from = "Column::TargetFragmentId",
to = "super::fragment::Column::FragmentId",
on_update = "NoAction",
on_delete = "Cascade"
)]
TargetFragment,
}

impl ActiveModelBehavior for ActiveModel {}
1 change: 1 addition & 0 deletions src/meta/model/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod compaction_task;
pub mod connection;
pub mod database;
pub mod fragment;
pub mod fragment_relation;
pub mod function;
pub mod hummock_epoch_to_version;
pub mod hummock_gc_history;
Expand Down
1 change: 1 addition & 0 deletions src/meta/model/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub use super::compaction_task::Entity as CompactionTask;
pub use super::connection::Entity as Connection;
pub use super::database::Entity as Database;
pub use super::fragment::Entity as Fragment;
pub use super::fragment_relation::Entity as FragmentRelation;
pub use super::function::Entity as Function;
pub use super::hummock_pinned_snapshot::Entity as HummockPinnedSnapshot;
pub use super::hummock_pinned_version::Entity as HummockPinnedVersion;
Expand Down
Loading
Loading