diff --git a/pom.xml b/pom.xml
index 8325bcf9e93bc..bfbd0f63d963e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -192,6 +192,7 @@
redis-hbo-provider
presto-singlestore
presto-hana
+ presto-base-arrow-flight
diff --git a/presto-base-arrow-flight/pom.xml b/presto-base-arrow-flight/pom.xml
new file mode 100644
index 0000000000000..d47ec7601db84
--- /dev/null
+++ b/presto-base-arrow-flight/pom.xml
@@ -0,0 +1,322 @@
+
+
+ 4.0.0
+
+ com.facebook.presto
+ presto-root
+ 0.289-SNAPSHOT
+
+ presto-base-arrow-flight
+ presto-base-arrow-flight
+ arrow-flight Connector Plugin for Presto
+
+
+ ${project.parent.basedir}
+ 1.53.0
+ 4.10.0
+ 11.0.0
+ 4.1.100.Final
+
+
+
+
+
+ com.facebook.airlift
+ bootstrap
+
+
+ ch.qos.logback
+ logback-core
+
+
+
+
+
+ com.facebook.airlift
+ log
+
+
+
+ com.google.guava
+ guava
+
+
+ org.checkerframework
+ checker-qual
+
+
+ com.google.errorprone
+ error_prone_annotations
+
+
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ com.facebook.presto
+ presto-common
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
+ com.google.code.findbugs
+ jsr305
+ true
+
+
+
+ org.apache.arrow
+ arrow-memory-core
+ ${arrow.version}
+
+
+
+ com.google.inject
+ guice
+
+
+
+ com.facebook.airlift
+ configuration
+
+
+
+ io.netty
+ netty-codec-http2
+ ${netty.version}
+
+
+
+ io.netty
+ netty-handler-proxy
+ ${netty.version}
+
+
+ io.netty
+ netty-codec-http
+
+
+
+
+
+ io.netty
+ netty-tcnative-boringssl-static
+ 2.0.65.Final
+
+
+
+ org.apache.arrow
+ flight-core
+ ${arrow.version}
+
+
+ io.netty
+ netty-codec-http2
+
+
+ io.netty
+ netty-codec-http
+
+
+ io.netty
+ netty-handler-proxy
+
+
+
+
+
+ org.apache.arrow
+ arrow-vector
+ ${arrow.version}
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+
+
+
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ com.facebook.airlift
+ json
+ test
+
+
+
+ org.mockito
+ mockito-core
+ 5.11.0
+
+
+ org.objenesis
+ objenesis
+
+
+ test
+
+
+
+ com.facebook.presto
+ presto-testng-services
+ test
+
+
+
+ com.facebook.airlift
+ testing
+ test
+ ${dep.airlift.version}
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
+
+
+ org.codehaus.mojo
+ animal-sniffer-annotations
+ 1.21
+
+
+
+ com.google.errorprone
+ error_prone_annotations
+ 2.14.0
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.21.7
+
+
+
+ io.grpc
+ grpc-protobuf
+ 1.53.0
+
+
+
+
+ io.grpc
+ grpc-stub
+ 1.53.0
+
+
+
+ io.grpc
+ grpc-core
+ 1.53.0
+
+
+
+ io.grpc
+ grpc-api
+ 1.53.0
+
+
+
+ io.grpc
+ grpc-context
+ 1.53.0
+
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ 2.13.4
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+
+
+ com.google.errorprone:error_prone_annotations
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ io.netty:netty-codec-http2
+ io.netty:netty-handler-proxy
+ io.netty:netty-tcnative-boringssl-static
+
+
+
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+ 1.2.1
+
+
+ module-info
+ META-INF.versions.9.module-info
+
+
+ arrow-git.properties
+
+
+
+
+
+ check
+
+
+
+
+
+
+
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java
new file mode 100644
index 0000000000000..1336676b1582f
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractFlightRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import java.util.Optional;
+
+public abstract class ArrowAbstractFlightRequest
+ implements ArrowFlightRequest
+{
+ private final String schema;
+ private final String table;
+ private final Optional query;
+
+ public ArrowAbstractFlightRequest(String schema)
+ {
+ this.schema = schema;
+ this.query = Optional.empty();
+ this.table = null;
+ }
+
+ public ArrowAbstractFlightRequest(String schema, String table)
+ {
+ this.schema = schema;
+ this.table = table;
+ query = Optional.empty();
+ }
+
+ public ArrowAbstractFlightRequest(String schema, String table, Optional query)
+ {
+ this.schema = schema;
+ this.table = table;
+ this.query = query;
+ }
+
+ public String getSchema()
+ {
+ return schema;
+ }
+
+ public String getTable()
+ {
+ return table;
+ }
+
+ public Optional getQuery()
+ {
+ return query;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java
new file mode 100644
index 0000000000000..9384667d0ed52
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.DoubleType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.SmallintType;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.TinyintType;
+import com.facebook.presto.common.type.VarbinaryType;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.NotFoundException;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+import static java.util.Objects.requireNonNull;
+
+public abstract class ArrowAbstractMetadata
+ implements ConnectorMetadata
+{
+ private static final Logger logger = Logger.get(ArrowAbstractMetadata.class);
+ private final ArrowFlightConfig config;
+ private final ArrowFlightClientHandler clientHandler;
+
+ public ArrowAbstractMetadata(ArrowFlightConfig config, ArrowFlightClientHandler clientHandler)
+ {
+ this.config = config;
+ this.clientHandler = requireNonNull(clientHandler);
+ }
+
+ protected abstract String getDBSpecificSchemaName(ArrowFlightConfig config, String schemaName);
+
+ protected abstract String getDBSpecificTableName(ArrowFlightConfig config, String tableName);
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ if (!listSchemaNames(session).contains(tableName.getSchemaName())) {
+ return null;
+ }
+
+ if (!listTables(session, Optional.ofNullable(tableName.getSchemaName())).contains(tableName)) {
+ return null;
+ }
+ return new ArrowTableHandle(tableName.getSchemaName(), tableName.getTableName());
+ }
+
+ protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query, String schema, String table);
+
+ protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema);
+
+ public List getColumnsList(String schema, String table, ConnectorSession connectorSession)
+ {
+ try {
+ String dbSpecificSchemaName = getDBSpecificSchemaName(config, schema);
+ String dbSpecificTableName = getDBSpecificTableName(config, table);
+ ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(), Optional.empty(),
+ dbSpecificSchemaName, dbSpecificTableName);
+
+ FlightInfo flightInfo = clientHandler.getFlightInfo(request, connectorSession);
+ List fields = flightInfo.getSchema().getFields();
+ return fields;
+ }
+ catch (Exception e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The table columns could not be listed for the table " + table, e);
+ }
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ Map column = new HashMap<>();
+
+ String schemaValue = ((ArrowTableHandle) tableHandle).getSchema();
+ String tableValue = ((ArrowTableHandle) tableHandle).getTable();
+ String dbSpecificSchemaValue = getDBSpecificSchemaName(config, schemaValue);
+ String dBSpecificTableName = getDBSpecificTableName(config, tableValue);
+ List columnList = getColumnsList(dbSpecificSchemaValue, dBSpecificTableName, session);
+
+ ArrowTypeHandle typeHandle = new ArrowTypeHandle(1, "typename", 10, 3, Optional.empty());
+ for (Field field : columnList) {
+ String columnName = field.getName();
+ logger.debug("The value of the flight columnName is:- %s", columnName);
+ switch (field.getType().getTypeID()) {
+ case Int:
+ ArrowType.Int intType = (ArrowType.Int) field.getType();
+ switch (intType.getBitWidth()) {
+ case 64:
+ column.put(columnName, new ArrowColumnHandle(columnName, BigintType.BIGINT, typeHandle));
+ break;
+ case 32:
+ column.put(columnName, new ArrowColumnHandle(columnName, IntegerType.INTEGER, typeHandle));
+ break;
+ case 16:
+ column.put(columnName, new ArrowColumnHandle(columnName, SmallintType.SMALLINT, typeHandle));
+ break;
+ case 8:
+ column.put(columnName, new ArrowColumnHandle(columnName, TinyintType.TINYINT, typeHandle));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth());
+ }
+ break;
+ case Binary:
+ case LargeBinary:
+ case FixedSizeBinary:
+ column.put(columnName, new ArrowColumnHandle(columnName, VarbinaryType.VARBINARY, typeHandle));
+ break;
+ case Date:
+ column.put(columnName, new ArrowColumnHandle(columnName, DateType.DATE, typeHandle));
+ break;
+ case Timestamp:
+ column.put(columnName, new ArrowColumnHandle(columnName, TimestampType.TIMESTAMP, typeHandle));
+ break;
+ case Utf8:
+ case LargeUtf8:
+ column.put(columnName, new ArrowColumnHandle(columnName, VarcharType.VARCHAR, typeHandle));
+ break;
+ case FloatingPoint:
+ ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType();
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ column.put(columnName, new ArrowColumnHandle(columnName, RealType.REAL, typeHandle));
+ break;
+ case DOUBLE:
+ column.put(columnName, new ArrowColumnHandle(columnName, DoubleType.DOUBLE, typeHandle));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision());
+ }
+ break;
+ case Decimal:
+ ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ column.put(columnName, new ArrowColumnHandle(columnName, DecimalType.createDecimalType(precision, scale), typeHandle));
+ break;
+ case Bool:
+ column.put(columnName, new ArrowColumnHandle(columnName, BooleanType.BOOLEAN, typeHandle));
+ break;
+ case Time:
+ column.put(columnName, new ArrowColumnHandle(columnName, TimeType.TIME, typeHandle));
+ break;
+ default:
+ throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported.");
+ }
+ }
+ return column;
+ }
+
+ @Override
+ public List getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint constraint, Optional> desiredColumns)
+ {
+ ArrowTableHandle tableHandle = (ArrowTableHandle) table;
+
+ List columns = new ArrayList<>();
+ if (desiredColumns.isPresent()) {
+ List arrowColumns = new ArrayList<>(desiredColumns.get());
+ columns = (List) (List>) arrowColumns;
+ }
+
+ ConnectorTableLayout layout = new ConnectorTableLayout(new ArrowTableLayoutHandle(tableHandle, columns, constraint.getSummary(), Optional.empty()));
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
+ {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ List meta = new ArrayList<>();
+ List columnList = getColumnsList(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable(), session);
+
+ for (Field field : columnList) {
+ String columnName = field.getName();
+ switch (field.getType().getTypeID()) {
+ case Int:
+ ArrowType.Int intType = (ArrowType.Int) field.getType();
+ switch (intType.getBitWidth()) {
+ case 64:
+ meta.add(new ColumnMetadata(columnName, BigintType.BIGINT));
+ break;
+ case 32:
+ meta.add(new ColumnMetadata(columnName, IntegerType.INTEGER));
+ break;
+ case 16:
+ meta.add(new ColumnMetadata(columnName, SmallintType.SMALLINT));
+ break;
+ case 8:
+ meta.add(new ColumnMetadata(columnName, TinyintType.TINYINT));
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid bit width " + intType.getBitWidth());
+ }
+ break;
+ case Binary:
+ case LargeBinary:
+ case FixedSizeBinary:
+ meta.add(new ColumnMetadata(columnName, VarbinaryType.VARBINARY));
+ break;
+ case Date:
+ meta.add(new ColumnMetadata(columnName, DateType.DATE));
+ break;
+ case Timestamp:
+ meta.add(new ColumnMetadata(columnName, TimestampType.TIMESTAMP));
+ break;
+ case Utf8:
+ case LargeUtf8:
+ meta.add(new ColumnMetadata(columnName, VarcharType.VARCHAR));
+ break;
+ case FloatingPoint:
+ ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType();
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ meta.add(new ColumnMetadata(columnName, RealType.REAL)); // Float4
+ break;
+ case DOUBLE:
+ meta.add(new ColumnMetadata(columnName, DoubleType.DOUBLE)); // Float8
+ break;
+ default:
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision());
+ }
+ break;
+ case Decimal:
+ ArrowType.Decimal decimalType = (ArrowType.Decimal) field.getType();
+ int precision = decimalType.getPrecision();
+ int scale = decimalType.getScale();
+ meta.add(new ColumnMetadata(columnName, DecimalType.createDecimalType(precision, scale)));
+ break;
+ case Time:
+ meta.add(new ColumnMetadata(columnName, TimeType.TIME));
+ break;
+ case Bool:
+ meta.add(new ColumnMetadata(columnName, BooleanType.BOOLEAN));
+ break;
+ default:
+ throw new UnsupportedOperationException("The data type " + field.getType().getTypeID() + " is not supported.");
+ }
+ }
+ return new ConnectorTableMetadata(new SchemaTableName(((ArrowTableHandle) table).getSchema(), ((ArrowTableHandle) table).getTable()), meta);
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ return ((ArrowColumnHandle) columnHandle).getColumnMetadata();
+ }
+
+ @Override
+ public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ requireNonNull(prefix, "prefix is null");
+ ImmutableMap.Builder> columns = ImmutableMap.builder();
+ List tables;
+ if (prefix.getSchemaName() != null && prefix.getTableName() != null) {
+ tables = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ }
+ else {
+ tables = listTables(session, Optional.of(prefix.getSchemaName()));
+ }
+
+ for (SchemaTableName tableName : tables) {
+ try {
+ ConnectorTableHandle tableHandle = getTableHandle(session, tableName);
+ columns.put(tableName, getTableMetadata(session, tableHandle).getColumns());
+ }
+ catch (ClassCastException | NotFoundException e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The table columns could not be listed for the table " + tableName, e);
+ }
+ catch (Exception e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, e.getMessage(), e);
+ }
+ }
+ return columns.build();
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java
new file mode 100644
index 0000000000000..b4e82a3ef694c
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableMap;
+import org.apache.arrow.flight.FlightInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public abstract class ArrowAbstractSplitManager
+ implements ConnectorSplitManager
+{
+ private static final Logger logger = Logger.get(ArrowAbstractSplitManager.class);
+ private final String identifierQuote;
+ private final String literalQuote;
+ private final ArrowFlightClientHandler clientHandler;
+
+ public ArrowAbstractSplitManager(ArrowFlightClientHandler client, String identifierQuote, String literalQuote)
+ {
+ this.clientHandler = client;
+ this.identifierQuote = identifierQuote;
+ this.literalQuote = literalQuote;
+ }
+
+ protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query,
+ String schema, String table);
+
+ @Override
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext)
+ {
+ ArrowTableLayoutHandle tableLayoutHandle = (ArrowTableLayoutHandle) layout;
+ ArrowTableHandle tableHandle = tableLayoutHandle.getTableHandle();
+ Optional query = Optional.of(new ArrowQueryBuilder(literalQuote, identifierQuote).buildSql(tableHandle.getSchema(),
+ tableHandle.getTable(),
+ ((ArrowTableLayoutHandle) layout).getColumnHandles(), ImmutableMap.of(),
+ tableLayoutHandle.getTupleDomain(),
+ tableLayoutHandle.getAdditionalPredicate()));
+
+ ArrowFlightRequest request = getArrowFlightRequest(clientHandler.getConfig(),
+ query, tableHandle.getSchema(), tableHandle.getTable());
+
+ FlightInfo flightInfo = clientHandler.getFlightInfo(request, session);
+ List splits = flightInfo.getEndpoints()
+ .stream()
+ .map(info -> new ArrowSplit(
+ tableHandle.getSchema(),
+ tableHandle.getTable(),
+ info.getTicket().getBytes(),
+ info.getLocations().stream().map(location -> location.getUri().toString()).collect(Collectors.toList())))
+ .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));
+ logger.info("created %d splits from arrow tickets", splits.size());
+ return new FixedSplitSource(splits);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java
new file mode 100644
index 0000000000000..42bc2abc11a8f
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowColumnHandle.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowColumnHandle
+ implements ColumnHandle
+{
+ private final String columnName;
+ private final Type columnType;
+ private final ArrowTypeHandle arrowTypeHandle;
+
+ @JsonCreator
+ public ArrowColumnHandle(
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("columnType") Type columnType,
+ @JsonProperty("jdbcTypeHandle") ArrowTypeHandle arrowTypeHandle)
+ {
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "type is null");
+ this.arrowTypeHandle = arrowTypeHandle;
+ }
+
+ @JsonProperty
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ @JsonProperty
+ public Type getColumnType()
+ {
+ return columnType;
+ }
+
+ public ArrowTypeHandle getArrowTypeHandle()
+ {
+ return arrowTypeHandle;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(columnName, columnType);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
new file mode 100644
index 0000000000000..9523bd210faa5
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnector.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import com.google.inject.Inject;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowConnector
+ implements Connector
+{
+ private final ConnectorMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+ private final ConnectorHandleResolver handleResolver;
+
+ @Inject
+ public ArrowConnector(ConnectorMetadata metadata,
+ ConnectorHandleResolver handleResolver,
+ ConnectorSplitManager splitManager,
+ ConnectorPageSourceProvider pageSourceProvider)
+ {
+ this.metadata = requireNonNull(metadata, "Metadata is null");
+ this.handleResolver = requireNonNull(handleResolver, "Metadata is null");
+ this.splitManager = requireNonNull(splitManager, "SplitManager is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "PageSinkProvider is null");
+ }
+
+ public Optional getHandleResolver()
+ {
+ return Optional.of(handleResolver);
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ return ArrowTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
new file mode 100644
index 0000000000000..f17317dd42b3a
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.bootstrap.Bootstrap;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.google.inject.ConfigurationException;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import java.util.Map;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static java.util.Objects.requireNonNull;
+
+public class ArrowConnectorFactory
+ implements ConnectorFactory
+{
+ private final String name;
+ private final Module module;
+ private final ClassLoader classLoader;
+
+ public ArrowConnectorFactory(String name, Module module, ClassLoader classLoader)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new ArrowHandleResolver();
+ }
+
+ @Override
+ public Connector create(String catalogName, Map requiredConfig, ConnectorContext context)
+ {
+ requireNonNull(requiredConfig, "requiredConfig is null");
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(
+ binder -> {
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
+ binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
+ binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ },
+ new ArrowModule(catalogName),
+ module);
+
+ Injector injector = app
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(requiredConfig)
+ .initialize();
+
+ return injector.getInstance(ArrowConnector.class);
+ }
+ catch (ConfigurationException ex) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The connector instance could not be created.", ex);
+ }
+ catch (Exception e) {
+ throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java
new file mode 100644
index 0000000000000..dce08bac4ac24
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowConnectorId.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowConnectorId
+{
+ private final String id;
+
+ public ArrowConnectorId(String id)
+ {
+ this.id = requireNonNull(id, "id is null");
+ }
+
+ @Override
+ public String toString()
+ {
+ return id;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+ ArrowConnectorId other = (ArrowConnectorId) obj;
+ return Objects.equals(this.id, other.id);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java
new file mode 100644
index 0000000000000..2e33f736a62c5
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.ErrorCode;
+import com.facebook.presto.common.ErrorType;
+import com.facebook.presto.spi.ErrorCodeSupplier;
+
+import static com.facebook.presto.common.ErrorType.EXTERNAL;
+import static com.facebook.presto.common.ErrorType.INTERNAL_ERROR;
+
+public enum ArrowErrorCode
+ implements ErrorCodeSupplier
+{
+ ARROW_INVALID_TABLE(0, EXTERNAL),
+ ARROW_INVALID_CREDENTAILS(1, EXTERNAL),
+ ARROW_FLIGHT_ERROR(2, EXTERNAL),
+ ARROW_INTERNAL_ERROR(3, INTERNAL_ERROR);
+
+ private final ErrorCode errorCode;
+
+ ArrowErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0509_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java
new file mode 100644
index 0000000000000..ba2c6edba589c
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ErrorCodeSupplier;
+import com.facebook.presto.spi.PrestoException;
+
+public class ArrowException
+ extends PrestoException
+{
+ public ArrowException(ErrorCodeSupplier errorCode, String message)
+ {
+ super(errorCode, message);
+ }
+
+ public ArrowException(ErrorCodeSupplier errorCode, String message, Throwable cause)
+ {
+ super(errorCode, message, cause);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java
new file mode 100644
index 0000000000000..b5c30bfcaf4a0
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowExpression.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.relation.ConstantExpression;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowExpression
+{
+ private final String expression;
+ private final List boundConstantValues;
+
+ public ArrowExpression(String expression)
+ {
+ this(expression, ImmutableList.of());
+ }
+
+ @JsonCreator
+ public ArrowExpression(
+ @JsonProperty("translatedString") String expression,
+ @JsonProperty("boundConstantValues") List constantBindValues)
+ {
+ this.expression = requireNonNull(expression, "expression is null");
+ this.boundConstantValues = requireNonNull(constantBindValues, "boundConstantValues is null");
+ }
+
+ @JsonProperty
+ public String getExpression()
+ {
+ return expression;
+ }
+
+ /**
+ * Constant expressions are not added to the expression String. Instead they appear as "?" in the query.
+ * This is because we would potentially lose precision on double values. Hence when we make a PreparedStatement
+ * out of the SQL string replacing every "?" by it's corresponding actual bindValue.
+ *
+ * @return List of constants to replace in the SQL string.
+ */
+ @JsonProperty
+ public List getBoundConstantValues()
+ {
+ return boundConstantValues;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ArrowExpression that = (ArrowExpression) o;
+ return expression.equals(that.expression) &&
+ boundConstantValues.equals(that.boundConstantValues);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(expression, boundConstantValues);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
new file mode 100644
index 0000000000000..3d3550beba9f7
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import org.apache.arrow.flight.FlightClient;
+
+import java.io.InputStream;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class ArrowFlightClient
+{
+ private final FlightClient flightClient;
+ private final Optional trustedCertificate;
+
+ public ArrowFlightClient(FlightClient flightClient, Optional trustedCertificate)
+ {
+ this.flightClient = requireNonNull(flightClient, "flightClient cannot be null");
+ this.trustedCertificate = trustedCertificate;
+ }
+
+ public FlightClient getFlightClient()
+ {
+ return flightClient;
+ }
+
+ public Optional getTrustedCertificate()
+ {
+ return trustedCertificate;
+ }
+
+ public void close() throws Exception
+ {
+ flightClient.close();
+ if (trustedCertificate.isPresent()) {
+ trustedCertificate.get().close();
+ }
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
new file mode 100644
index 0000000000000..4cbdc5f114f78
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.ConnectorSession;
+import org.apache.arrow.flight.FlightClient;
+import org.apache.arrow.flight.FlightDescriptor;
+import org.apache.arrow.flight.FlightInfo;
+import org.apache.arrow.flight.Location;
+import org.apache.arrow.flight.grpc.CredentialCallOption;
+import org.apache.arrow.memory.RootAllocator;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Optional;
+import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+
+public abstract class ArrowFlightClientHandler
+{
+ private static final Logger logger = Logger.get(ArrowFlightClientHandler.class);
+ private AtomicBoolean isClientClosed = new AtomicBoolean(true);
+ private final ArrowFlightConfig config;
+ private ScheduledExecutorService scheduledExecutorService;
+ private ArrowFlightClient arrowFlightClient;
+ private Optional trustedCertificate = Optional.empty();
+ private TimerTask closeTask;
+ private static final int TIMER_DURATION_IN_MINUTES = 30;
+
+ public ArrowFlightClientHandler(ArrowFlightConfig config)
+ {
+ this.config = config;
+ }
+
+ private void initializeClient(Optional uri)
+ {
+ if (!isClientClosed.get()) {
+ return;
+ }
+ try {
+ RootAllocator allocator = new RootAllocator(Long.MAX_VALUE);
+ Optional trustedCertificate = Optional.empty();
+
+ Location location;
+ if (uri.isPresent()) {
+ location = new Location(uri.get());
+ }
+ else {
+ if (config.getArrowFlightServerSslEnabled() != null && !config.getArrowFlightServerSslEnabled()) {
+ location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort());
+ }
+ else {
+ location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort());
+ }
+ }
+
+ logger.debug("location %s", location.getUri().toString());
+
+ FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location);
+ if (config.getVerifyServer() != null && !config.getVerifyServer()) {
+ flightClientBuilder.verifyServer(false);
+ }
+ else if (config.getFlightServerSSLCertificate() != null) {
+ trustedCertificate = Optional.of(new FileInputStream(config.getFlightServerSSLCertificate()));
+ flightClientBuilder.trustedCertificates(trustedCertificate.get()).useTls();
+ }
+
+ FlightClient flightClient = flightClientBuilder.build();
+ this.arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate);
+ isClientClosed.set(false);
+ }
+ catch (Exception ex) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight client could not be obtained." + ex.getMessage(), ex);
+ }
+ }
+
+ public ArrowFlightConfig getConfig()
+ {
+ return config;
+ }
+
+ public ArrowFlightClient getClient(Optional uri)
+ {
+ if (isClientClosed.get()) {
+ logger.info("Reinitialize the client if closed or not initialized");
+ initializeClient(uri);
+ scheduleCloseTask();
+ }
+ else {
+ resetTimer(); // Reset timer when client is reused
+ }
+ return this.arrowFlightClient;
+ }
+
+ public FlightInfo getFlightInfo(ArrowFlightRequest request, ConnectorSession connectorSession)
+ {
+ try {
+ ArrowFlightClient client = getClient(Optional.empty());
+ CredentialCallOption auth = this.getCallOptions(connectorSession);
+ FlightDescriptor descriptor = FlightDescriptor.command(request.getCommand());
+ logger.debug("Fetching flight info");
+ FlightInfo flightInfo = client.getFlightClient().getInfo(descriptor, ArrowFlightConstants.CALL_OPTIONS_TIMEOUT, auth);
+ logger.debug("got flight info");
+ return flightInfo;
+ }
+ catch (Exception e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight information could not be obtained from the flight server." + e.getMessage(), e);
+ }
+ }
+
+ protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession);
+
+ public void close() throws Exception
+ {
+ if (arrowFlightClient != null) {
+ arrowFlightClient.close();
+ arrowFlightClient = null;
+ }
+ if (trustedCertificate.isPresent()) {
+ trustedCertificate.get().close();
+ }
+ shutdownTimer();
+ isClientClosed.set(true);
+ }
+
+ private void scheduleCloseTask()
+ {
+ scheduledExecutorService = Executors.newScheduledThreadPool(1);
+ Runnable closeTask = () -> {
+ try {
+ close();
+ logger.info("in closeTask");
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ scheduledExecutorService.shutdown();
+ };
+ scheduledExecutorService.schedule(closeTask, TIMER_DURATION_IN_MINUTES, TimeUnit.MINUTES);
+ }
+
+ public void resetTimer()
+ {
+ shutdownTimer();
+ scheduleCloseTask();
+ }
+
+ public void shutdownTimer()
+ {
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
+ }
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java
new file mode 100644
index 0000000000000..ab082d84880bb
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConfig.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.configuration.Config;
+import com.facebook.airlift.configuration.ConfigSecuritySensitive;
+
+public class ArrowFlightConfig
+{
+ private String server;
+ private String host;
+ private String database;
+ private String username;
+ private String password;
+ private String name;
+ private Integer port;
+ private Boolean ssl;
+ private Boolean verifyServer;
+ private String flightServerSSLCertificate;
+ private Boolean arrowFlightServerSslEnabled;
+ private Integer arrowFlightPort;
+ public String getFlightServerName()
+ {
+ return server;
+ }
+
+ public String getDataSourceHost()
+ {
+ return host;
+ }
+
+ public String getDataSourceDatabase()
+ {
+ return database;
+ }
+
+ public String getDataSourceUsername()
+ {
+ return username;
+ }
+
+ public String getDataSourcePassword()
+ {
+ return password;
+ }
+
+ public String getDataSourceName()
+ {
+ return name;
+ }
+
+ public Integer getDataSourcePort()
+ {
+ return port;
+ }
+
+ public Boolean getDataSourceSSL()
+ {
+ return ssl;
+ }
+
+ public Boolean getVerifyServer()
+ {
+ return verifyServer;
+ }
+
+ public Boolean getArrowFlightServerSslEnabled()
+ {
+ return arrowFlightServerSslEnabled;
+ }
+
+ public String getFlightServerSSLCertificate()
+ {
+ return flightServerSSLCertificate;
+ }
+
+ public Integer getArrowFlightPort()
+ {
+ return arrowFlightPort;
+ }
+
+ @Config("arrow-flight.server")
+ public ArrowFlightConfig setFlightServerName(String server)
+ {
+ this.server = server;
+ return this;
+ }
+
+ @Config("data-source.host")
+ public ArrowFlightConfig setDataSourceHost(String host)
+ {
+ this.host = host;
+ return this;
+ }
+
+ @Config("data-source.database")
+ public ArrowFlightConfig setDataSourceDatabase(String database)
+ {
+ this.database = database;
+ return this;
+ }
+
+ @Config("data-source.username")
+ public ArrowFlightConfig setDataSourceUsername(String username)
+ {
+ this.username = username;
+ return this;
+ }
+
+ @Config("data-source.password")
+ @ConfigSecuritySensitive
+ public ArrowFlightConfig setDataSourcePassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+
+ @Config("data-source.name")
+ public ArrowFlightConfig setDataSourceName(String name)
+ {
+ this.name = name;
+ return this;
+ }
+
+ @Config("data-source.port")
+ public ArrowFlightConfig setDataSourcePort(Integer port)
+ {
+ this.port = port;
+ return this;
+ }
+
+ @Config("data-source.ssl")
+ public ArrowFlightConfig setDataSourceSSL(Boolean ssl)
+ {
+ this.ssl = ssl;
+ return this;
+ }
+
+ @Config("arrow-flight.server.verify")
+ public ArrowFlightConfig setVerifyServer(Boolean verifyServer)
+ {
+ this.verifyServer = verifyServer;
+ return this;
+ }
+
+ @Config("arrow-flight.server.port")
+ public ArrowFlightConfig setArrowFlightPort(Integer arrowFlightPort)
+ {
+ this.arrowFlightPort = arrowFlightPort;
+ return this;
+ }
+
+ @Config("arrow-flight.server-ssl-certificate")
+ public ArrowFlightConfig setFlightServerSSLCertificate(String flightServerSSLCertificate)
+ {
+ this.flightServerSSLCertificate = flightServerSSLCertificate;
+ return this;
+ }
+
+ @Config("arrow-flight.server-ssl-enabled")
+ public ArrowFlightConfig setArrowFlightServerSslEnabled(Boolean arrowFlightServerSslEnabled)
+ {
+ this.arrowFlightServerSslEnabled = arrowFlightServerSslEnabled;
+ return this;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConstants.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConstants.java
new file mode 100644
index 0000000000000..4e405f6f25dba
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightConstants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import org.apache.arrow.flight.CallOption;
+import org.apache.arrow.flight.CallOptions;
+
+import java.util.concurrent.TimeUnit;
+
+public class ArrowFlightConstants
+{
+ public static final CallOption CALL_OPTIONS_TIMEOUT = CallOptions.timeout(5, TimeUnit.MINUTES);
+
+ private ArrowFlightConstants()
+ {
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java
new file mode 100644
index 0000000000000..7e04e0a6066e3
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightRequest.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+public interface ArrowFlightRequest
+{
+ byte[] getCommand();
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java
new file mode 100644
index 0000000000000..8b231b98a6ee6
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowHandleResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public class ArrowHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return ArrowTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return ArrowTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return ArrowColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return ArrowSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return ArrowTransactionHandle.class;
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java
new file mode 100644
index 0000000000000..b20762f8ad497
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowModule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class ArrowModule
+ implements Module
+{
+ protected final String connectorId;
+
+ public ArrowModule(String connectorId)
+ {
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ }
+
+ public void configure(Binder binder)
+ {
+ configBinder(binder).bindConfig(ArrowFlightConfig.class);
+ binder.bind(ArrowConnector.class).in(Scopes.SINGLETON);
+ binder.bind(ArrowConnectorId.class).toInstance(new ArrowConnectorId(connectorId));
+ binder.bind(ConnectorHandleResolver.class).to(ArrowHandleResolver.class).in(Scopes.SINGLETON);
+ binder.bind(ArrowPageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSourceProvider.class).to(ArrowPageSourceProvider.class).in(Scopes.SINGLETON);
+ binder.bind(Connector.class).to(ArrowConnector.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
new file mode 100644
index 0000000000000..d6afe6f80075d
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.common.Page;
+import com.facebook.presto.common.block.Block;
+import com.facebook.presto.common.block.BlockBuilder;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DecimalType;
+import com.facebook.presto.common.type.Decimals;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.arrow.flight.FlightRuntimeException;
+import org.apache.arrow.flight.FlightStream;
+import org.apache.arrow.flight.Ticket;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DateMilliVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.NullVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeMicroVector;
+import org.apache.arrow.vector.TimeMilliVector;
+import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static com.facebook.plugin.arrow.ArrowErrorCode.ARROW_FLIGHT_ERROR;
+
+public class ArrowPageSource
+ implements ConnectorPageSource
+{
+ private final ArrowSplit split;
+ private final List columnHandles;
+ private boolean completed;
+ private int currentPosition;
+ private ArrowFlightClient flightClient;
+ private FlightStream flightStream;
+ private Logger logger = Logger.get(ArrowPageSource.class);
+ private final ArrowFlightClientHandler clientHandler;
+ private Optional vectorSchemaRoot = Optional.empty();
+
+ public ArrowPageSource(ArrowSplit split, List columnHandles, ArrowFlightClientHandler clientHandler,
+ ConnectorSession connectorSession)
+ {
+ this.columnHandles = columnHandles;
+ this.split = split;
+ this.clientHandler = clientHandler;
+ getFlightStream(clientHandler, split.getTicket(), connectorSession);
+ }
+
+ private void getFlightStream(ArrowFlightClientHandler clientHandler, byte[] ticket, ConnectorSession connectorSession)
+ {
+ try {
+ Optional uri = split.getLocationUrls().isEmpty() ? Optional.empty() : Optional.of(split.getLocationUrls().get(0));
+ flightClient = clientHandler.getClient(uri);
+ flightStream = flightClient.getFlightClient().getStream(new Ticket(ticket),
+ ArrowFlightConstants.CALL_OPTIONS_TIMEOUT, clientHandler.getCallOptions(connectorSession));
+ }
+ catch (FlightRuntimeException e) {
+ throw new ArrowException(ARROW_FLIGHT_ERROR, e.getMessage(), e);
+ }
+ }
+
+ private Block buildBlockFromVector(FieldVector vector, Type type)
+ {
+ if (vector instanceof BitVector) {
+ return buildBlockFromBitVector((BitVector) vector, type);
+ }
+ else if (vector instanceof TinyIntVector) {
+ return buildBlockFromTinyIntVector((TinyIntVector) vector, type);
+ }
+ else if (vector instanceof IntVector) {
+ return buildBlockFromIntVector((IntVector) vector, type);
+ }
+ else if (vector instanceof SmallIntVector) {
+ return buildBlockFromSmallIntVector((SmallIntVector) vector, type);
+ }
+ else if (vector instanceof BigIntVector) {
+ return buildBlockFromBigIntVector((BigIntVector) vector, type);
+ }
+ else if (vector instanceof DecimalVector) {
+ return buildBlockFromDecimalVector((DecimalVector) vector, type);
+ }
+ else if (vector instanceof NullVector) {
+ return buildBlockFromNullVector((NullVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMicroVector) {
+ return buildBlockFromTimeStampMicroVector((TimeStampMicroVector) vector, type);
+ }
+ else if (vector instanceof TimeStampMilliVector) {
+ return buildBlockFromTimeStampMilliVector((TimeStampMilliVector) vector, type);
+ }
+ else if (vector instanceof Float4Vector) {
+ return buildBlockFromFloat4Vector((Float4Vector) vector, type);
+ }
+ else if (vector instanceof Float8Vector) {
+ return buildBlockFromFloat8Vector((Float8Vector) vector, type);
+ }
+ else if (vector instanceof VarCharVector) {
+ return buildBlockFromVarCharVector((VarCharVector) vector, type);
+ }
+ else if (vector instanceof VarBinaryVector) {
+ return buildBlockFromVarBinaryVector((VarBinaryVector) vector, type);
+ }
+ else if (vector instanceof DateDayVector) {
+ return buildBlockFromDateDayVector((DateDayVector) vector, type);
+ }
+ else if (vector instanceof DateMilliVector) {
+ return buildBlockFromDateMilliVector((DateMilliVector) vector, type);
+ }
+ else if (vector instanceof TimeMilliVector) {
+ return buildBlockFromTimeMilliVector((TimeMilliVector) vector, type);
+ }
+ else if (vector instanceof TimeSecVector) {
+ return buildBlockFromTimeSecVector((TimeSecVector) vector, type);
+ }
+ else if (vector instanceof TimeStampSecVector) {
+ return buildBlockFromTimeStampSecVector((TimeStampSecVector) vector, type);
+ }
+ else if (vector instanceof TimeMicroVector) {
+ return buildBlockFromTimeMicroVector((TimeMicroVector) vector, type);
+ }
+ throw new UnsupportedOperationException("Unsupported vector type: " + vector.getClass().getSimpleName());
+ }
+
+ private Block buildBlockFromBitVector(BitVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeBoolean(builder, vector.get(i) == 1);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromIntVector(IntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromSmallIntVector(SmallIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTinyIntVector(TinyIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromBigIntVector(BigIntVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromDecimalVector(DecimalVector vector, Type type)
+ {
+ if (!(type instanceof DecimalType)) {
+ throw new IllegalArgumentException("Type must be a DecimalType for DecimalVector");
+ }
+
+ DecimalType decimalType = (DecimalType) type;
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ BigDecimal decimal = vector.getObject(i); // Get the BigDecimal value
+ if (decimalType.isShort()) {
+ builder.writeLong(decimal.unscaledValue().longValue());
+ }
+ else {
+ Slice slice = Decimals.encodeScaledValue(decimal);
+ decimalType.writeSlice(builder, slice, 0, slice.length());
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromNullVector(NullVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ builder.appendNull();
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeStampMicroVector(TimeStampMicroVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long micros = vector.get(i);
+ long millis = TimeUnit.MICROSECONDS.toMillis(micros);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeStampMilliVector(TimeStampMilliVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Expected TimestampType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long millis = vector.get(i);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromFloat8Vector(Float8Vector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeDouble(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromFloat4Vector(Float4Vector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int intBits = Float.floatToIntBits(vector.get(i));
+ type.writeLong(builder, intBits);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromVarBinaryVector(VarBinaryVector vector, Type type)
+ {
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ byte[] value = vector.get(i);
+ type.writeSlice(builder, Slices.wrappedBuffer(value));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromVarCharVector(VarCharVector vector, Type type)
+ {
+ if (!(type instanceof VarcharType)) {
+ throw new IllegalArgumentException("Expected VarcharType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ String value = new String(vector.get(i), StandardCharsets.UTF_8);
+ type.writeSlice(builder, Slices.utf8Slice(value));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromDateDayVector(DateDayVector vector, Type type)
+ {
+ if (!(type instanceof DateType)) {
+ throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ type.writeLong(builder, vector.get(i));
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromDateMilliVector(DateMilliVector vector, Type type)
+ {
+ if (!(type instanceof DateType)) {
+ throw new IllegalArgumentException("Expected DateType but got " + type.getClass().getName());
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ DateType dateType = (DateType) type;
+ long days = TimeUnit.MILLISECONDS.toDays(vector.get(i));
+ dateType.writeLong(builder, days);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeSecVector(TimeSecVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int value = vector.get(i);
+ long millis = TimeUnit.SECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeMilliVector(TimeMilliVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimeSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ int value = vector.get(i);
+ long millis = TimeUnit.MILLISECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeMicroVector(TimeMicroVector vector, Type type)
+ {
+ if (!(type instanceof TimeType)) {
+ throw new IllegalArgumentException("Type must be a TimeType for TimemicroVector");
+ }
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long value = vector.get(i);
+ long micro = TimeUnit.MICROSECONDS.toMillis(value);
+ type.writeLong(builder, micro);
+ }
+ }
+ return builder.build();
+ }
+
+ private Block buildBlockFromTimeStampSecVector(TimeStampSecVector vector, Type type)
+ {
+ if (!(type instanceof TimestampType)) {
+ throw new IllegalArgumentException("Type must be a TimestampType for TimeStampSecVector");
+ }
+
+ BlockBuilder builder = type.createBlockBuilder(null, vector.getValueCount());
+ for (int i = 0; i < vector.getValueCount(); i++) {
+ if (vector.isNull(i)) {
+ builder.appendNull();
+ }
+ else {
+ long value = vector.get(i);
+ long millis = TimeUnit.SECONDS.toMillis(value);
+ type.writeLong(builder, millis);
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public long getCompletedBytes()
+ {
+ return 0;
+ }
+
+ @Override
+ public long getCompletedPositions()
+ {
+ return currentPosition;
+ }
+
+ @Override
+ public long getReadTimeNanos()
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return completed;
+ }
+
+ @Override
+ public long getSystemMemoryUsage()
+ {
+ return 0;
+ }
+
+ @Override
+ public Page getNextPage()
+ {
+ if (vectorSchemaRoot.isPresent()) {
+ vectorSchemaRoot.get().close();
+ vectorSchemaRoot = Optional.empty();
+ }
+
+ if (flightStream.next()) {
+ vectorSchemaRoot = Optional.ofNullable(flightStream.getRoot());
+ }
+
+ if (!vectorSchemaRoot.isPresent()) {
+ completed = true;
+ }
+
+ if (isFinished()) {
+ return null;
+ }
+
+ currentPosition++;
+
+ List blocks = new ArrayList<>();
+ for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) {
+ FieldVector vector = vectorSchemaRoot.get().getVector(columnIndex);
+ Type type = columnHandles.get(columnIndex).getColumnType();
+
+ Block block = buildBlockFromVector(vector, type);
+ blocks.add(block);
+ }
+
+ return new Page(vectorSchemaRoot.get().getRowCount(), blocks.toArray(new Block[0]));
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ if (vectorSchemaRoot.isPresent()) {
+ vectorSchemaRoot.get().close();
+ }
+ if (flightStream != null) {
+ try {
+ flightStream.close();
+ }
+ catch (Exception e) {
+ logger.error(e);
+ }
+ }
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java
new file mode 100644
index 0000000000000..f3bb41c3e35d4
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSourceProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.SplitContext;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.common.collect.ImmutableList;
+
+import javax.inject.Inject;
+
+import java.util.List;
+
+public class ArrowPageSourceProvider
+ implements ConnectorPageSourceProvider
+{
+ private static final Logger logger = Logger.get(ArrowPageSourceProvider.class);
+ private ArrowFlightClientHandler clientHandler;
+ @Inject
+ public ArrowPageSourceProvider(ArrowFlightClientHandler clientHandler)
+ {
+ this.clientHandler = clientHandler;
+ }
+
+ @Override
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns, SplitContext splitContext)
+ {
+ ImmutableList.Builder columnHandles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ columnHandles.add((ArrowColumnHandle) handle);
+ }
+ ArrowSplit arrowSplit = (ArrowSplit) split;
+ logger.debug("Processing split with flight ticket");
+ return new ArrowPageSource(arrowSplit, columnHandles.build(), clientHandler, session);
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java
new file mode 100644
index 0000000000000..7c072f9b4ffcc
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPlugin.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Module;
+
+import static com.google.common.base.MoreObjects.firstNonNull;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.Objects.requireNonNull;
+
+public class ArrowPlugin
+ implements Plugin
+{
+ private static ClassLoader getClassLoader()
+ {
+ return firstNonNull(Thread.currentThread().getContextClassLoader(), ArrowPlugin.class.getClassLoader());
+ }
+
+ protected String name;
+ protected Module module;
+
+ public ArrowPlugin(String name, Module module)
+ {
+ checkArgument(!isNullOrEmpty(name), "name is null or empty");
+ this.name = name;
+ this.module = requireNonNull(module, "module is null");
+ }
+
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new ArrowConnectorFactory(name, module, getClassLoader()));
+ }
+}
diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowQueryBuilder.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowQueryBuilder.java
new file mode 100644
index 0000000000000..ef1a2090d877d
--- /dev/null
+++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowQueryBuilder.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.plugin.arrow;
+
+import com.facebook.presto.common.predicate.Domain;
+import com.facebook.presto.common.predicate.Range;
+import com.facebook.presto.common.predicate.TupleDomain;
+import com.facebook.presto.common.type.BigintType;
+import com.facebook.presto.common.type.BooleanType;
+import com.facebook.presto.common.type.CharType;
+import com.facebook.presto.common.type.DateType;
+import com.facebook.presto.common.type.DoubleType;
+import com.facebook.presto.common.type.IntegerType;
+import com.facebook.presto.common.type.RealType;
+import com.facebook.presto.common.type.SmallintType;
+import com.facebook.presto.common.type.TimeType;
+import com.facebook.presto.common.type.TimestampType;
+import com.facebook.presto.common.type.TinyintType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.common.type.VarcharType;
+import com.facebook.presto.spi.ColumnHandle;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+
+public class ArrowQueryBuilder
+{
+ private static final String ALWAYS_TRUE = "1=1";
+ private static final String ALWAYS_FALSE = "1=0";
+ private final String quote;
+ private final String identifierQuote;
+
+ private String addColumnExpression(List columns, Map columnExpressions)
+ {
+ if (columns.isEmpty()) {
+ return "null";
+ }
+
+ return columns.stream()
+ .map(jdbcColumnHandle -> {
+ String columnAlias = quote(jdbcColumnHandle.getColumnName());
+ String expression = columnExpressions.get(jdbcColumnHandle.getColumnName());
+ if (expression == null) {
+ return columnAlias;
+ }
+ return format("%s AS %s", expression, columnAlias);
+ })
+ .collect(joining(", "));
+ }
+
+ private static boolean isAcceptedType(Type type)
+ {
+ Type validType = requireNonNull(type, "type is null");
+ return validType.equals(BigintType.BIGINT) ||
+ validType.equals(TinyintType.TINYINT) ||
+ validType.equals(SmallintType.SMALLINT) ||
+ validType.equals(IntegerType.INTEGER) ||
+ validType.equals(DoubleType.DOUBLE) ||
+ validType.equals(RealType.REAL) ||
+ validType.equals(BooleanType.BOOLEAN) ||
+ validType.equals(DateType.DATE) ||
+ validType.equals(TimeType.TIME) ||
+ validType.equals(TimestampType.TIMESTAMP) ||
+ validType instanceof VarcharType ||
+ validType instanceof CharType;
+ }
+ private List toConjuncts(List columns, TupleDomain tupleDomain, List accumulator)
+ {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ for (ArrowColumnHandle column : columns) {
+ Type type = column.getColumnType();
+ if (isAcceptedType(type)) {
+ Domain domain = tupleDomain.getDomains().get().get(column);
+ if (domain != null) {
+ builder.add(toPredicate(column.getColumnName(), domain, column, accumulator));
+ }
+ }
+ }
+ return builder.build();
+ }
+
+ private String toPredicate(String columnName, Domain domain, ArrowColumnHandle columnHandle, List accumulator)
+ {
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ return domain.isNullAllowed() ? quote(columnName) + " IS NULL" : ALWAYS_FALSE;
+ }
+
+ if (domain.getValues().isAll()) {
+ return domain.isNullAllowed() ? ALWAYS_TRUE : quote(columnName) + " IS NOT NULL";
+ }
+
+ List disjuncts = new ArrayList<>();
+ List