Skip to content

Commit

Permalink
Add config property 'tpcds.use-varchar-type'
Browse files Browse the repository at this point in the history
Co-authored-by: Pramod Satya <[email protected]>
  • Loading branch information
2 people authored and Pratik Joseph Dabre committed Feb 12, 2025
1 parent 1c43927 commit 1cf3d33
Show file tree
Hide file tree
Showing 16 changed files with 183 additions and 45 deletions.
2 changes: 2 additions & 0 deletions presto-docs/src/main/sphinx/connector/tpcds.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,6 @@ Property Name Description
================================================== ========================================================================== ==============================
``tpcds.splits-per-node`` Number of data splits generated per Presto worker node when querying Number of available processors
data from the TPCDS connector.

``tpcds.use-varchar-type`` Toggle all char columns to varchar in the TPC-DS connector. false
================================================== ========================================================================== ==============================
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,32 @@ public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> ta
return createQueryRunner(tables, ImmutableMap.of(), Optional.empty());
}

public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tpchTables, Map<String, String> extraProperties, Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), Optional.empty(), Optional.empty(), tpcdsProperties);
}

public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Map<String, String> extraProperties,
Map<String, String> extraCoordinatorProperties,
Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, extraCoordinatorProperties, "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty());
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, extraCoordinatorProperties, "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}

public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tpchTables, Map<String, String> extraProperties, Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty());
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}

public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tpchTables, List<String> tpcdsTableNames, Map<String, String> extraProperties, Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, tpcdsTableNames, extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty());
return createQueryRunner(tpchTables, tpcdsTableNames, extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}

public static DistributedQueryRunner createQueryRunner(
Expand All @@ -128,7 +134,7 @@ public static DistributedQueryRunner createQueryRunner(
Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), security, extraHiveProperties, Optional.empty(), dataDirectory, Optional.empty());
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), security, extraHiveProperties, Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}

public static DistributedQueryRunner createQueryRunner(
Expand All @@ -140,10 +146,11 @@ public static DistributedQueryRunner createQueryRunner(
Map<String, String> extraHiveProperties,
Optional<Integer> workerCount,
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(tpchTables, tpcdsTableNames, extraProperties, extraCoordinatorProperties, security, extraHiveProperties, workerCount, dataDirectory, externalWorkerLauncher, Optional.empty());
return createQueryRunner(tpchTables, tpcdsTableNames, extraProperties, extraCoordinatorProperties, security, extraHiveProperties, workerCount, dataDirectory, externalWorkerLauncher, Optional.empty(), tpcdsProperties);
}

public static DistributedQueryRunner createQueryRunner(
Expand All @@ -156,7 +163,8 @@ public static DistributedQueryRunner createQueryRunner(
Optional<Integer> workerCount,
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Optional<ExtendedHiveMetastore> externalMetastore)
Optional<ExtendedHiveMetastore> externalMetastore,
Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(
Expand All @@ -170,7 +178,8 @@ public static DistributedQueryRunner createQueryRunner(
dataDirectory,
externalWorkerLauncher,
externalMetastore,
false);
false,
tpcdsProperties);
}

public static DistributedQueryRunner createQueryRunner(
Expand All @@ -184,7 +193,8 @@ public static DistributedQueryRunner createQueryRunner(
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Optional<ExtendedHiveMetastore> externalMetastore,
boolean addJmxPlugin)
boolean addJmxPlugin,
Map<String, String> tpcdsProperties)
throws Exception
{
assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=America/Bahia_Banderas to your JVM arguments");
Expand All @@ -211,7 +221,7 @@ public static DistributedQueryRunner createQueryRunner(
queryRunner.installPlugin(new TpcdsPlugin());
queryRunner.installPlugin(new TestingHiveEventListenerPlugin());
queryRunner.createCatalog("tpch", "tpch");
queryRunner.createCatalog("tpcds", "tpcds");
queryRunner.createCatalog("tpcds", "tpcds", tpcdsProperties);
Map<String, String> tpchProperties = ImmutableMap.<String, String>builder()
.put("tpch.column-naming", "standard")
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

import java.util.Optional;

import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.PARTIAL_AGGREGATION_PUSHDOWN_ENABLED;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
Expand All @@ -47,7 +45,7 @@ protected QueryRunner createQueryRunner()
return HiveQueryRunner.createQueryRunner(
ImmutableList.of(ORDERS),
ImmutableMap.of("native-execution-enabled", "true"),
Optional.empty());
ImmutableMap.of("tpcds.use-varchar-type", "true"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public static DistributedQueryRunner create(
hiveEndpoint.getPort()),
new MetastoreClientConfig(),
HDFS_ENVIRONMENT),
new HivePartitionMutator())));
new HivePartitionMutator())),
ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
Expand Down Expand Up @@ -341,6 +342,10 @@ public static QueryRunner createNativeQueryRunner(
coordinatorProperties.put("single-node-execution-enabled", "true");
}

Map<String, String> tpcdsProperties = ImmutableMap.<String, String>builder()
.put("tpcds.use-varchar-type", "true")
.build();

// Make query runner with external workers for tests
return HiveQueryRunner.createQueryRunner(
ImmutableList.of(),
Expand All @@ -356,7 +361,8 @@ public static QueryRunner createNativeQueryRunner(
hiveProperties,
workerCount,
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin, isCoordinatorSidecarEnabled));
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, remoteFunctionServerUds, failOnNestedLoopJoin, isCoordinatorSidecarEnabled),
tpcdsProperties);
}

public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String storageFormat)
Expand Down Expand Up @@ -399,7 +405,8 @@ public static QueryRunner createNativeCteQueryRunner(boolean useThrift, String s
hiveProperties,
workerCount,
Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory + "/" + storageFormat : dataDirectory)),
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false, false));
getExternalWorkerLauncher("hive", prestoServerPath, cacheMaxSize, Optional.empty(), false, false),
ImmutableMap.of());
}

public static QueryRunner createNativeQueryRunner(String remoteFunctionServerUds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public ConnectorHandleResolver getHandleResolver()
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
int splitsPerNode = getSplitsPerNode(config);
// The Java TPC-DS connector works with either char or varchar columns.
// However, native execution only supports varchar columns, hence in a native cluster `tpcds.use-varchar-type` must be true.
boolean useVarcharType = useVarcharType(config);
if (context.getConnectorSystemConfig().isNativeExecution() && !(useVarcharType)) {
throw new IllegalArgumentException("`tpcds.use-varchar-type` config property is not true for a native cluster");
}

NodeManager nodeManager = context.getNodeManager();
return new Connector()
{
Expand All @@ -76,7 +83,7 @@ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel
@Override
public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
{
return new TpcdsMetadata();
return new TpcdsMetadata(useVarcharType);
}

@Override
Expand All @@ -88,7 +95,7 @@ public ConnectorSplitManager getSplitManager()
@Override
public ConnectorRecordSetProvider getRecordSetProvider()
{
return new TpcdsRecordSetProvider();
return new TpcdsRecordSetProvider(useVarcharType);
}

@Override
Expand All @@ -109,6 +116,16 @@ private int getSplitsPerNode(Map<String, String> properties)
}
}

private boolean useVarcharType(Map<String, String> properties)
{
try {
return parseBoolean(firstNonNull(properties.get("tpcds.use-varchar-type"), String.valueOf(false)));
}
catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid property tpcds.use-varchar-type");
}
}

private boolean isWithNoSexism(Map<String, String> properties)
{
return parseBoolean(firstNonNull(properties.get("tpcds.with-no-sexism"), String.valueOf(false)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,16 @@ public class TpcdsMetadata
private final Set<String> tableNames;
private final TpcdsTableStatisticsFactory tpcdsTableStatisticsFactory = new TpcdsTableStatisticsFactory();

public TpcdsMetadata()
private final boolean useVarcharType;

public TpcdsMetadata(boolean useVarcharType)
{
ImmutableSet.Builder<String> tableNames = ImmutableSet.builder();
for (Table tpcdsTable : Table.getBaseTables()) {
tableNames.add(tpcdsTable.getName().toLowerCase(ENGLISH));
}
this.tableNames = tableNames.build();
this.useVarcharType = useVarcharType;
}

@Override
Expand Down Expand Up @@ -134,14 +137,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
Table table = Table.getTable(tpcdsTableHandle.getTableName());
String schemaName = scaleFactorSchemaName(tpcdsTableHandle.getScaleFactor());

return getTableMetadata(schemaName, table);
return getTableMetadata(schemaName, table, useVarcharType);
}

private static ConnectorTableMetadata getTableMetadata(String schemaName, Table tpcdsTable)
private static ConnectorTableMetadata getTableMetadata(String schemaName, Table tpcdsTable, boolean useVarcharType)
{
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
for (Column column : tpcdsTable.getColumns()) {
columns.add(new ColumnMetadata(column.getName(), getPrestoType(column.getType())));
columns.add(new ColumnMetadata(column.getName(), getPrestoType(column.getType(), useVarcharType)));
}
SchemaTableName tableName = new SchemaTableName(schemaName, tpcdsTable.getName());
return new ConnectorTableMetadata(tableName, columns.build());
Expand Down Expand Up @@ -189,7 +192,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
for (String schemaName : getSchemaNames(session, Optional.ofNullable(prefix.getSchemaName()))) {
for (Table tpcdsTable : Table.getBaseTables()) {
if (prefix.getTableName() == null || tpcdsTable.getName().equals(prefix.getTableName())) {
ConnectorTableMetadata tableMetadata = getTableMetadata(schemaName, tpcdsTable);
ConnectorTableMetadata tableMetadata = getTableMetadata(schemaName, tpcdsTable, useVarcharType);
tableColumns.put(new SchemaTableName(schemaName, tpcdsTable.getName()), tableMetadata.getColumns());
}
}
Expand Down Expand Up @@ -243,7 +246,7 @@ public static double schemaNameToScaleFactor(String schemaName)
}
}

public static Type getPrestoType(ColumnType tpcdsType)
public static Type getPrestoType(ColumnType tpcdsType, boolean useVarcharType)
{
switch (tpcdsType.getBase()) {
case IDENTIFIER:
Expand All @@ -255,6 +258,9 @@ public static Type getPrestoType(ColumnType tpcdsType)
case DECIMAL:
return createDecimalType(tpcdsType.getPrecision().get(), tpcdsType.getScale().get());
case CHAR:
if (useVarcharType) {
return createVarcharType(tpcdsType.getPrecision().get());
}
return createCharType(tpcdsType.getPrecision().get());
case VARCHAR:
return createVarcharType(tpcdsType.getPrecision().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,20 @@ public class TpcdsRecordSet

private final List<Type> columnTypes;

public TpcdsRecordSet(Results results, List<Column> columns)
private final boolean useVarcharType;

public TpcdsRecordSet(Results results, List<Column> columns, boolean useVarcharType)
{
requireNonNull(results, "results is null");

this.results = results;
this.columns = ImmutableList.copyOf(columns);
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
for (Column column : columns) {
columnTypes.add(getPrestoType(column.getType()));
columnTypes.add(getPrestoType(column.getType(), useVarcharType));
}
this.columnTypes = columnTypes.build();
this.useVarcharType = useVarcharType;
}

@Override
Expand Down Expand Up @@ -103,7 +106,7 @@ public long getReadTimeNanos()
@Override
public Type getType(int field)
{
return getPrestoType(columns.get(field).getType());
return getPrestoType(columns.get(field).getType(), useVarcharType);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@
public class TpcdsRecordSetProvider
implements ConnectorRecordSetProvider
{
private final boolean useVarcharType;

public TpcdsRecordSetProvider(boolean useVarcharType)
{
this.useVarcharType = useVarcharType;
}

@Override
public RecordSet getRecordSet(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns)
{
Expand Down Expand Up @@ -63,6 +70,6 @@ private RecordSet getRecordSet(
.withTable(table)
.withNoSexism(noSexism);
Results results = constructResults(table, session);
return new TpcdsRecordSet(results, builder.build());
return new TpcdsRecordSet(results, builder.build(), useVarcharType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.tpcds;

import com.facebook.presto.testing.MaterializedResult;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import org.testng.annotations.Test;

Expand All @@ -25,16 +24,9 @@
import static java.util.stream.Collectors.joining;
import static java.util.stream.IntStream.range;

public class TestTpcds
public abstract class AbstractTestTpcds
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return TpcdsQueryRunner.createQueryRunner();
}

@Test
public void testSelect()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class TestTpcdsMetadataStatistics
{
private static final EstimateAssertion estimateAssertion = new EstimateAssertion(0.01);
private static final ConnectorSession session = null;
private final TpcdsMetadata metadata = new TpcdsMetadata();
private final TpcdsMetadata metadata = new TpcdsMetadata(false);

@Test
public void testNoTableStatsForNotSupportedSchema()
Expand Down
Loading

0 comments on commit 1cf3d33

Please sign in to comment.