Skip to content

Commit

Permalink
Optimize FragmentRelation population: Use DISTINCT, InnerJoin, and im…
Browse files Browse the repository at this point in the history
…prove result processing

Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Jan 7, 2025
1 parent 9a6c069 commit 6458934
Showing 1 changed file with 33 additions and 16 deletions.
49 changes: 33 additions & 16 deletions src/meta/model/migration/src/m20250106_072104_fragment_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,42 +75,57 @@ impl MigrationTrait for Migration {
}
}

// Set worker parallelism based on the number of parallel unit ids
// Fulfill the FragmentRelation table with data from the Actor and ActorDispatcher tables
async fn fulfill_fragment_relation(manager: &SchemaManager<'_>) -> Result<(), DbErr> {
let connection = manager.get_connection();

let database_backend = connection.get_database_backend();

let (sql, values) = Query::select()
.columns([(Actor::Table, Actor::FragmentId)])
.distinct_on([
FragmentRelation::SourceFragmentId,
FragmentRelation::TargetFragmentId,
])
.expr_as(
Expr::col((Actor::Table, Actor::FragmentId)),
FragmentRelation::SourceFragmentId,
)
.expr_as(
Expr::col((ActorDispatcher::Table, ActorDispatcher::DispatcherId)),
FragmentRelation::TargetFragmentId,
)
.columns([
(ActorDispatcher::Table, ActorDispatcher::DispatcherId),
(ActorDispatcher::Table, ActorDispatcher::DispatcherType),
(ActorDispatcher::Table, ActorDispatcher::DistKeyIndices),
(ActorDispatcher::Table, ActorDispatcher::OutputIndices),
])
.from(Actor::Table)
.join(
JoinType::LeftJoin,
JoinType::InnerJoin,
ActorDispatcher::Table,
Expr::col((Actor::Table, Actor::ActorId))
.equals((ActorDispatcher::Table, ActorDispatcher::ActorId)),
)
.to_owned()
.build_any(&*database_backend.get_query_builder());

let stmt = Statement::from_sql_and_values(database_backend, sql, values);

for FragmentRelationEntity {
source_fragment_id,
target_fragment_id,
dispatcher_type,
dist_key_indices,
output_indices,
} in FragmentRelationEntity::find_by_statement(stmt)
.all(connection)
.await?
{
let rows = connection
.query_all(Statement::from_sql_and_values(
database_backend,
sql,
values,
))
.await?;

for row in rows {
let FragmentRelationEntity {
source_fragment_id,
target_fragment_id,
dispatcher_type,
dist_key_indices,
output_indices,
} = FragmentRelationEntity::from_query_result(&row, "")?;

manager
.exec_stmt(
Query::insert()
Expand All @@ -133,13 +148,15 @@ async fn fulfill_fragment_relation(manager: &SchemaManager<'_>) -> Result<(), Db
)
.await?;
}

Ok(())
}

#[derive(Clone, Debug, PartialEq, FromJsonQueryResult, Serialize, Deserialize, Default)]
pub struct I32Array(pub Vec<i32>);

#[derive(Debug, FromQueryResult)]
#[sea_orm(entity = "FragmentRelation")]
pub struct FragmentRelationEntity {
source_fragment_id: i32,
target_fragment_id: i32,
Expand Down

0 comments on commit 6458934

Please sign in to comment.