Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
wuwenchi committed Apr 26, 2024
1 parent c2c250c commit c01b9ce
Show file tree
Hide file tree
Showing 17 changed files with 777 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import org.apache.doris.thrift.TIcebergTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import org.apache.iceberg.Table;

import java.util.HashMap;
import java.util.List;
import java.util.Set;

public class IcebergExternalTable extends ExternalTable {

Expand Down Expand Up @@ -81,4 +83,13 @@ public long fetchRowCount() {
makeSureInitialized();
return IcebergUtils.getIcebergRowCount(getCatalog(), getDbName(), getName());
}

public Table getIcebergTable() {
return IcebergUtils.getIcebergTable(getCatalog(), getDbName(), getName());
}

@Override
public Set<String> getPartitionNames() {
return super.getPartitionNames();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.analyzer;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.doris.nereids.exceptions.UnboundException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.UnboundLogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.plans.BlockFuncDepsPropagation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Sink;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.UnboundLogicalSink;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;

import java.util.List;
import java.util.Objects;
import java.util.Optional;

/**
* Represent an hive table sink plan node that has not been bound.
*/
public class UnboundIcebergTableSink<CHILD_TYPE extends Plan> extends UnboundLogicalSink<CHILD_TYPE>
implements Unbound, Sink, BlockFuncDepsPropagation {
private final List<String> hints;
private final List<String> partitions;

public UnboundIcebergTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, DMLCommandType.NONE,
Optional.empty(), Optional.empty(), child);
}

/**
* constructor
*/
public UnboundIcebergTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions,
DMLCommandType dmlCommandType,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(nameParts, PlanType.LOGICAL_UNBOUND_HIVE_TABLE_SINK, ImmutableList.of(), groupExpression,
logicalProperties, colNames, dmlCommandType, child);
this.hints = Utils.copyRequiredList(hints);
this.partitions = Utils.copyRequiredList(partitions);
}

public List<String> getColNames() {
return colNames;
}

public List<String> getPartitions() {
return partitions;
}

public List<String> getHints() {
return hints;
}

@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
"UnboundHiveTableSink only accepts one child");
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.empty(), children.get(0));
}

@Override
public UnboundIcebergTableSink<CHILD_TYPE> withOutputExprs(List<NamedExpression> outputExprs) {
throw new UnboundException("could not call withOutputExprs on UnboundHiveTableSink");
}

@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitUnboundIcebergTableSink(this, context);
}

@Override
public List<? extends Expression> getExpressions() {
throw new UnsupportedOperationException(this.getClass().getSimpleName() + " don't support getExpression()");
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnboundIcebergTableSink<?> that = (UnboundIcebergTableSink<?>) o;
return Objects.equals(nameParts, that.nameParts)
&& Objects.equals(colNames, that.colNames)
&& Objects.equals(hints, that.hints)
&& Objects.equals(partitions, that.partitions);
}

@Override
public int hashCode() {
return Objects.hash(nameParts, colNames, hints, partitions);
}

@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, Optional.of(getLogicalProperties()), child());
}

@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
dmlCommandType, groupExpression, logicalProperties, children.get(0));
}

@Override
public LogicalProperties computeLogicalProperties() {
return UnboundLogicalProperties.INSTANCE;
}

@Override
public List<Slot> computeOutput() {
throw new UnboundException("output");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect;
import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
Expand Down Expand Up @@ -462,6 +463,27 @@ public PlanFragment visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends P
return rootFragment;
}

@Override
public PlanFragment visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? extends Plan> icebergTableSink,
PlanTranslatorContext context) {
PlanFragment rootFragment = icebergTableSink.child().accept(this, context);
rootFragment.setOutputPartition(DataPartition.UNPARTITIONED);

TupleDescriptor hiveTuple = context.generateTupleDesc();
List<Column> targetTableColumns = icebergTableSink.getTargetTable().getFullSchema();
for (Column column : targetTableColumns) {
SlotDescriptor slotDesc = context.addSlotDesc(hiveTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(column.getType());
slotDesc.setColumn(column);
slotDesc.setIsNullable(column.isAllowNull());
slotDesc.setAutoInc(column.isAutoInc());
}
HiveTableSink sink = new HiveTableSink(icebergTableSink.getTargetTable());
rootFragment.setSink(sink);
return rootFragment;
}

@Override
public PlanFragment visitPhysicalFileSink(PhysicalFileSink<? extends Plan> fileSink,
PlanTranslatorContext context) {
Expand Down Expand Up @@ -2617,6 +2639,7 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec,
DistributionSpecTableSinkHashPartitioned partitionSpecHash =
(DistributionSpecTableSinkHashPartitioned) distributionSpec;
List<Expr> partitionExprs = Lists.newArrayList();
// TODO wuwenchi 转换成 partitionExpr
List<ExprId> partitionExprIds = partitionSpecHash.getOutputColExprIds();
for (ExprId partitionExprId : partitionExprIds) {
if (childOutputIds.contains(partitionExprId)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapTableSink;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
Expand Down Expand Up @@ -59,6 +60,12 @@ public Plan visitLogicalHiveTableSink(LogicalHiveTableSink<? extends Plan> table
return tableSink;
}

@Override
public Plan visitLogicalIcebergTableSink(LogicalIcebergTableSink<? extends Plan> tableSink, StatementContext context) {
turnOffPageCache(context);
return tableSink;
}

private void turnOffPageCache(StatementContext context) {
SessionVariable sessionVariable = context.getConnectContext().getSessionVariable();
// set temporary session value, and then revert value in the 'finally block' of StmtExecutor#execute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
Expand Down Expand Up @@ -140,6 +141,16 @@ public Void visitPhysicalHiveTableSink(PhysicalHiveTableSink<? extends Plan> hiv
return null;
}

@Override
public Void visitPhysicalIcebergTableSink(PhysicalIcebergTableSink<? extends Plan> icebergTableSink, PlanContext context) {
if (connectContext != null && !connectContext.getSessionVariable().enableStrictConsistencyDml) {
addRequestPropertyToChildren(PhysicalProperties.ANY);
} else {
addRequestPropertyToChildren(icebergTableSink.getRequirePhysicalProperties());
}
return null;
}

@Override
public Void visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink, PlanContext context) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.doris.nereids.rules.implementation.LogicalFilterToPhysicalFilter;
import org.apache.doris.nereids.rules.implementation.LogicalGenerateToPhysicalGenerate;
import org.apache.doris.nereids.rules.implementation.LogicalHiveTableSinkToPhysicalHiveTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIcebergTableSinkToPhysicalIcebergTableSink;
import org.apache.doris.nereids.rules.implementation.LogicalIntersectToPhysicalIntersect;
import org.apache.doris.nereids.rules.implementation.LogicalJdbcScanToPhysicalJdbcScan;
import org.apache.doris.nereids.rules.implementation.LogicalJoinToHashJoin;
Expand Down Expand Up @@ -191,6 +192,7 @@ public class RuleSet {
.add(new LogicalGenerateToPhysicalGenerate())
.add(new LogicalOlapTableSinkToPhysicalOlapTableSink())
.add(new LogicalHiveTableSinkToPhysicalHiveTableSink())
.add(new LogicalIcebergTableSinkToPhysicalIcebergTableSink())
.add(new LogicalFileSinkToPhysicalFileSink())
.add(new LogicalResultSinkToPhysicalResultSink())
.add(new LogicalDeferMaterializeResultSinkToPhysicalDeferMaterializeResultSink())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum RuleType {
// **** make sure BINDING_UNBOUND_LOGICAL_PLAN is the lowest priority in the rewrite rules. ****
BINDING_RESULT_SINK(RuleTypeClass.REWRITE),
BINDING_INSERT_HIVE_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_ICEBERG_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_TARGET_TABLE(RuleTypeClass.REWRITE),
BINDING_INSERT_FILE(RuleTypeClass.REWRITE),
BINDING_ONE_ROW_RELATION_SLOT(RuleTypeClass.REWRITE),
Expand Down Expand Up @@ -394,6 +395,7 @@ public enum RuleType {
LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_ICEBERG_TABLE_SINK_TO_PHYSICAL_ICEBERG_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_RESULT_SINK_TO_PHYSICAL_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_DEFER_MATERIALIZE_RESULT_SINK_TO_PHYSICAL_DEFER_MATERIALIZE_RESULT_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
LOGICAL_FILE_SINK_TO_PHYSICAL_FILE_SINK_RULE(RuleTypeClass.IMPLEMENTATION),
Expand Down
Loading

0 comments on commit c01b9ce

Please sign in to comment.