Skip to content

Commit

Permalink
Fixed bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabre12 authored and Pratik Joseph Dabre committed Jan 14, 2025
1 parent 96bd59e commit c785858
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.s3.HiveS3Module;
import com.facebook.presto.plugin.base.security.AllowAllAccessControl;
import com.facebook.presto.spi.ConnectorSystemConfig;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -97,6 +98,7 @@ public static Connector createConnector(
binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
binder.bind(FilterStatsCalculatorService.class).toInstance(context.getFilterStatsCalculatorService());
binder.bind(ConnectorSystemConfig.class).toInstance(context.getConnectorSystemConfig());
});

Injector injector = app
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.ConnectorPlanRewriter;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSystemConfig;
import com.facebook.presto.spi.ConnectorTableLayout;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.DiscretePredicates;
Expand Down Expand Up @@ -81,34 +82,27 @@
public class IcebergMetadataOptimizer
implements ConnectorPlanOptimizer
{
public static final CatalogSchemaName JAVA_BUILTIN_NAMESPACE = new CatalogSchemaName("presto", "default");
private static final Set<QualifiedObjectName> ALLOWED_FUNCTIONS = ImmutableSet.of(
QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "max"),
QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "min"),
QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "approx_distinct"));

// Min/Max could be folded into LEAST/GREATEST
private static final Map<QualifiedObjectName, QualifiedObjectName> AGGREGATION_SCALAR_MAPPING = ImmutableMap.of(
QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "max"), QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "greatest"),
QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "min"), QualifiedObjectName.valueOf(JAVA_BUILTIN_NAMESPACE, "least"));

private final FunctionMetadataManager functionMetadataManager;
private final TypeManager typeManager;
private final IcebergTransactionManager icebergTransactionManager;
private final RowExpressionService rowExpressionService;
private final StandardFunctionResolution functionResolution;

public IcebergMetadataOptimizer(FunctionMetadataManager functionMetadataManager,
TypeManager typeManager,
IcebergTransactionManager icebergTransactionManager,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution)
private final ConnectorSystemConfig connectorSystemConfig;

public IcebergMetadataOptimizer(
FunctionMetadataManager functionMetadataManager,
TypeManager typeManager,
IcebergTransactionManager icebergTransactionManager,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
ConnectorSystemConfig connectorSystemConfig)
{
this.functionMetadataManager = requireNonNull(functionMetadataManager, "functionMetadataManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.icebergTransactionManager = requireNonNull(icebergTransactionManager, "icebergTransactionManager is null");
this.rowExpressionService = requireNonNull(rowExpressionService, "rowExpressionService is null");
this.functionResolution = requireNonNull(functionResolution, "functionResolution is null");
this.connectorSystemConfig = requireNonNull(connectorSystemConfig, "connectorSystemConfig is null");
}

@Override
Expand All @@ -121,7 +115,8 @@ public PlanNode optimize(PlanNode maxSubplan, ConnectorSession session, Variable
icebergTransactionManager,
rowExpressionService,
functionResolution,
rowsForMetadataOptimizationThreshold);
rowsForMetadataOptimizationThreshold,
connectorSystemConfig);
PlanNode rewrittenPlan = ConnectorPlanRewriter.rewriteWith(optimizer, maxSubplan, null);
return rewrittenPlan;
}
Expand All @@ -137,15 +132,18 @@ private static class Optimizer
private final RowExpressionService rowExpressionService;
private final StandardFunctionResolution functionResolution;
private final int rowsForMetadataOptimizationThreshold;
private final Set<QualifiedObjectName> allowedFunctions;
private final Map<QualifiedObjectName, QualifiedObjectName> aggregationScalarMapping;

private Optimizer(ConnectorSession connectorSession,
PlanNodeIdAllocator idAllocator,
FunctionMetadataManager functionMetadataManager,
TypeManager typeManager,
IcebergTransactionManager icebergTransactionManager,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
int rowsForMetadataOptimizationThreshold)
PlanNodeIdAllocator idAllocator,
FunctionMetadataManager functionMetadataManager,
TypeManager typeManager,
IcebergTransactionManager icebergTransactionManager,
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
int rowsForMetadataOptimizationThreshold,
ConnectorSystemConfig connectorSystemConfig)
{
checkArgument(rowsForMetadataOptimizationThreshold >= 0, "The value of `rowsForMetadataOptimizationThreshold` should not less than 0");
this.connectorSession = connectorSession;
Expand All @@ -156,6 +154,15 @@ private Optimizer(ConnectorSession connectorSession,
this.functionResolution = functionResolution;
this.typeManager = typeManager;
this.rowsForMetadataOptimizationThreshold = rowsForMetadataOptimizationThreshold;
CatalogSchemaName defaultNamespace = requireNonNull(connectorSystemConfig, "connectorSystemConfig is null").getDefaultNamespace();
this.allowedFunctions = ImmutableSet.of(
QualifiedObjectName.valueOf(defaultNamespace, "max"),
QualifiedObjectName.valueOf(defaultNamespace, "min"),
QualifiedObjectName.valueOf(defaultNamespace, "approx_distinct"));
// Min/Max could be folded into LEAST/GREATEST
this.aggregationScalarMapping = ImmutableMap.of(
QualifiedObjectName.valueOf(defaultNamespace, "max"), QualifiedObjectName.valueOf(defaultNamespace, "greatest"),
QualifiedObjectName.valueOf(defaultNamespace, "min"), QualifiedObjectName.valueOf(defaultNamespace, "least"));
}

@Override
Expand All @@ -164,7 +171,7 @@ public PlanNode visitAggregation(AggregationNode node, RewriteContext<Void> cont
// supported functions are only MIN/MAX/APPROX_DISTINCT or distinct aggregates
for (Aggregation aggregation : node.getAggregations().values()) {
QualifiedObjectName functionName = functionMetadataManager.getFunctionMetadata(aggregation.getFunctionHandle()).getName();
if (!ALLOWED_FUNCTIONS.contains(functionName) && !aggregation.isDistinct()) {
if (!allowedFunctions.contains(functionName) && !aggregation.isDistinct()) {
return context.defaultRewrite(node);
}
}
Expand Down Expand Up @@ -270,7 +277,7 @@ private boolean isReducible(AggregationNode node, List<VariableReferenceExpressi
}
for (Aggregation aggregation : node.getAggregations().values()) {
FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(aggregation.getFunctionHandle());
if (!AGGREGATION_SCALAR_MAPPING.containsKey(functionMetadata.getName()) ||
if (!aggregationScalarMapping.containsKey(functionMetadata.getName()) ||
functionMetadata.getArgumentTypes().size() > 1 ||
!inputs.containsAll(aggregation.getCall().getArguments())) {
return false;
Expand Down Expand Up @@ -340,7 +347,7 @@ private RowExpression evaluateMinMax(FunctionMetadata aggregationFunctionMetadat
return new ConstantExpression(Optional.empty(), null, returnType);
}

String scalarFunctionName = AGGREGATION_SCALAR_MAPPING.get(aggregationFunctionMetadata.getName()).getObjectName();
String scalarFunctionName = aggregationScalarMapping.get(aggregationFunctionMetadata.getName()).getObjectName();
while (arguments.size() > 1) {
List<RowExpression> reducedArguments = new ArrayList<>();
// We fold for every 100 values because GREATEST/LEAST has argument count limit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.iceberg.IcebergTransactionManager;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.ConnectorSystemConfig;
import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider;
import com.facebook.presto.spi.function.FunctionMetadataManager;
import com.facebook.presto.spi.function.StandardFunctionResolution;
Expand All @@ -39,21 +40,23 @@ public IcebergPlanOptimizerProvider(
RowExpressionService rowExpressionService,
StandardFunctionResolution functionResolution,
FunctionMetadataManager functionMetadataManager,
TypeManager typeManager)
TypeManager typeManager,
ConnectorSystemConfig connectorSystemConfig)
{
requireNonNull(transactionManager, "transactionManager is null");
requireNonNull(rowExpressionService, "rowExpressionService is null");
requireNonNull(functionResolution, "functionResolution is null");
requireNonNull(functionMetadataManager, "functionMetadataManager is null");
requireNonNull(typeManager, "typeManager is null");
requireNonNull(connectorSystemConfig, "connectorSystemConfig is null");
this.planOptimizers = ImmutableSet.of(
new IcebergPlanOptimizer(functionResolution, rowExpressionService, functionMetadataManager, transactionManager),
new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, typeManager),
new IcebergParquetDereferencePushDown(transactionManager, rowExpressionService, typeManager));
this.logicalPlanOptimizers = ImmutableSet.of(
new IcebergPlanOptimizer(functionResolution, rowExpressionService, functionMetadataManager, transactionManager),
new IcebergFilterPushdown(rowExpressionService, functionResolution, functionMetadataManager, transactionManager, typeManager),
new IcebergMetadataOptimizer(functionMetadataManager, typeManager, transactionManager, rowExpressionService, functionResolution),
new IcebergMetadataOptimizer(functionMetadataManager, typeManager, transactionManager, rowExpressionService, functionResolution, connectorSystemConfig),
new IcebergParquetDereferencePushDown(transactionManager, rowExpressionService, typeManager),
new IcebergEqualityDeleteAsJoin(functionResolution, transactionManager, typeManager));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.log.Logger;
import com.facebook.airlift.node.NodeInfo;
import com.facebook.presto.common.CatalogSchemaName;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.connector.informationSchema.InformationSchemaConnector;
Expand Down Expand Up @@ -181,7 +182,21 @@ public ConnectorManager(
this.determinismEvaluator = requireNonNull(determinismEvaluator, "determinismEvaluator is null");
this.filterStatsCalculator = requireNonNull(filterStatsCalculator, "filterStatsCalculator is null");
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.connectorSystemConfig = () -> featuresConfig.isNativeExecutionEnabled();
this.connectorSystemConfig =
new ConnectorSystemConfig()
{
@Override
public boolean isNativeExecution()
{
return featuresConfig.isNativeExecutionEnabled();
}

@Override
public CatalogSchemaName getDefaultNamespace()
{
return metadataManager.getFunctionAndTypeManager().getDefaultNamespace();
}
};
}

@PreDestroy
Expand Down Expand Up @@ -314,8 +329,8 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
.ifPresent(metadataUpdaterProvider -> connectorMetadataUpdaterManager.addMetadataUpdaterProvider(connectorId, metadataUpdaterProvider));

connector.getConnectorTypeSerdeProvider()
.ifPresent(
connectorTypeSerdeProvider ->
.ifPresent(
connectorTypeSerdeProvider ->
connectorTypeSerdeManager.addConnectorTypeSerdeProvider(connectorId, connectorTypeSerdeProvider));

metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,15 +379,17 @@ public void registerBuiltInFunctions(List<? extends SqlFunction> functions)
public List<SqlFunction> listFunctions(Session session, Optional<String> likePattern, Optional<String> escape)
{
ImmutableList.Builder<SqlFunction> functions = new ImmutableList.Builder<>();
if (!isListBuiltInFunctionsOnly(session)) {
functions.addAll(SessionFunctionUtils.listFunctions(session.getSessionFunctions()));
if (isListBuiltInFunctionsOnly(session)) {
functions.addAll(functionNamespaceManagers.entrySet().stream()
.flatMap(manager -> manager.getValue().listFunctions(likePattern, escape).stream()
.filter((functionName) -> functionName.getSignature().getName().getCatalogSchemaName().equals(defaultNamespace)))
.collect(toImmutableList()));
}
else {
functions.addAll(listBuiltInFunctions());
functions.addAll(SessionFunctionUtils.listFunctions(session.getSessionFunctions()));
functions.addAll(functionNamespaceManagers.values().stream()
.flatMap(manager -> manager.listFunctions(likePattern, escape).stream())
.collect(toImmutableList()));
}

return functions.build().stream()
Expand Down
Loading

0 comments on commit c785858

Please sign in to comment.