From 6d9d75b88a5b8142c597b23b1912d18f39365b30 Mon Sep 17 00:00:00 2001 From: Sai Date: Mon, 29 Jul 2024 18:44:38 +0530 Subject: [PATCH] changes for arrow connector --- .../plugin/arrow/ArrowAbstractMetadata.java | 20 +-- .../arrow/ArrowAbstractSplitManager.java | 4 +- .../arrow/ArrowFlightClientHandler.java | 110 ++++++------ .../plugin/arrow/ArrowPageSource.java | 160 +++++++++--------- 4 files changed, 147 insertions(+), 147 deletions(-) diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java index c128a54e526ee..2769d057a13b3 100644 --- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractMetadata.java @@ -69,10 +69,6 @@ public ArrowAbstractMetadata(ArrowFlightConfig config, ArrowFlightClientHandler this.clientHandler = requireNonNull(clientHandler); } - protected abstract String getDataSourceSpecificSchemaName(ArrowFlightConfig config, String schemaName); - - protected abstract String getDataSourceSpecificTableName(ArrowFlightConfig config, String tableName); - @Override public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { @@ -86,10 +82,6 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable return new ArrowTableHandle(tableName.getSchemaName(), tableName.getTableName()); } - protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query, String schema, String table); - - protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema); - public List getColumnsList(String schema, String table, ConnectorSession connectorSession) { try { @@ -256,10 +248,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect ArrowType.FloatingPoint floatingPoint = (ArrowType.FloatingPoint) field.getType(); switch (floatingPoint.getPrecision()) { case SINGLE: - meta.add(new ColumnMetadata(columnName, RealType.REAL)); // Float4 + meta.add(new ColumnMetadata(columnName, RealType.REAL)); break; case DOUBLE: - meta.add(new ColumnMetadata(columnName, DoubleType.DOUBLE)); // Float8 + meta.add(new ColumnMetadata(columnName, DoubleType.DOUBLE)); break; default: throw new ArrowException(ARROW_FLIGHT_ERROR, "Invalid floating point precision " + floatingPoint.getPrecision()); @@ -317,4 +309,12 @@ public Map> listTableColumns(ConnectorSess } return columns.build(); } + + protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, Optional query, String schema, String table); + + protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, String schema); + + protected abstract String getDataSourceSpecificSchemaName(ArrowFlightConfig config, String schemaName); + + protected abstract String getDataSourceSpecificTableName(ArrowFlightConfig config, String tableName); } diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java index 90236604b797b..ddd63a6ba5006 100644 --- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowAbstractSplitManager.java @@ -37,8 +37,6 @@ public ArrowAbstractSplitManager(ArrowFlightClientHandler client) this.clientHandler = client; } - protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle); - @Override public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout, SplitSchedulingContext splitSchedulingContext) { @@ -59,4 +57,6 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand logger.info("created %d splits from arrow tickets", splits.size()); return new FixedSplitSource(splits); } + + protected abstract ArrowFlightRequest getArrowFlightRequest(ArrowFlightConfig config, ArrowTableLayoutHandle tableLayoutHandle); } diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java index e67ad87982c47..86671fd6a7090 100644 --- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowFlightClientHandler.java @@ -48,46 +48,6 @@ public ArrowFlightClientHandler(ArrowFlightConfig config) this.config = config; } - private void initializeClient(Optional uri) - { - if (!isClientClosed.get()) { - return; - } - try { - allocator = new RootAllocator(Long.MAX_VALUE); - Optional trustedCertificate = Optional.empty(); - - Location location; - if (uri.isPresent()) { - location = new Location(uri.get()); - } - else { - if (config.getArrowFlightServerSslEnabled() != null && !config.getArrowFlightServerSslEnabled()) { - location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort()); - } - else { - location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort()); - } - } - - FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location); - if (config.getVerifyServer() != null && !config.getVerifyServer()) { - flightClientBuilder.verifyServer(false); - } - else if (config.getFlightServerSSLCertificate() != null) { - trustedCertificate = Optional.of(new FileInputStream(config.getFlightServerSSLCertificate())); - flightClientBuilder.trustedCertificates(trustedCertificate.get()).useTls(); - } - - FlightClient flightClient = flightClientBuilder.build(); - this.arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); - isClientClosed.set(false); - } - catch (Exception ex) { - throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight client could not be obtained." + ex.getMessage(), ex); - } - } - public ArrowFlightConfig getConfig() { return config; @@ -122,8 +82,6 @@ public FlightInfo getFlightInfo(ArrowFlightRequest request, ConnectorSession con } } - protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession); - public synchronized void close() throws Exception { if (arrowFlightClient != null) { @@ -137,6 +95,61 @@ public synchronized void close() throws Exception isClientClosed.set(true); } + public void resetTimer() + { + shutdownTimer(); + scheduleCloseTask(); + } + + public void shutdownTimer() + { + if (scheduledExecutorService != null) { + scheduledExecutorService.shutdownNow(); + } + } + + protected abstract CredentialCallOption getCallOptions(ConnectorSession connectorSession); + + private void initializeClient(Optional uri) + { + if (!isClientClosed.get()) { + return; + } + try { + allocator = new RootAllocator(Long.MAX_VALUE); + Optional trustedCertificate = Optional.empty(); + + Location location; + if (uri.isPresent()) { + location = new Location(uri.get()); + } + else { + if (config.getArrowFlightServerSslEnabled() != null && !config.getArrowFlightServerSslEnabled()) { + location = Location.forGrpcInsecure(config.getFlightServerName(), config.getArrowFlightPort()); + } + else { + location = Location.forGrpcTls(config.getFlightServerName(), config.getArrowFlightPort()); + } + } + + FlightClient.Builder flightClientBuilder = FlightClient.builder(allocator, location); + if (config.getVerifyServer() != null && !config.getVerifyServer()) { + flightClientBuilder.verifyServer(false); + } + else if (config.getFlightServerSSLCertificate() != null) { + trustedCertificate = Optional.of(new FileInputStream(config.getFlightServerSSLCertificate())); + flightClientBuilder.trustedCertificates(trustedCertificate.get()).useTls(); + } + + FlightClient flightClient = flightClientBuilder.build(); + this.arrowFlightClient = new ArrowFlightClient(flightClient, trustedCertificate, allocator); + isClientClosed.set(false); + } + catch (Exception ex) { + throw new ArrowException(ARROW_FLIGHT_ERROR, "The flight client could not be obtained." + ex.getMessage(), ex); + } + } + private void scheduleCloseTask() { scheduledExecutorService = Executors.newScheduledThreadPool(1); @@ -152,17 +165,4 @@ private void scheduleCloseTask() }; scheduledExecutorService.schedule(closeTask, TIMER_DURATION_IN_MINUTES, TimeUnit.MINUTES); } - - public void resetTimer() - { - shutdownTimer(); - scheduleCloseTask(); - } - - public void shutdownTimer() - { - if (scheduledExecutorService != null) { - scheduledExecutorService.shutdownNow(); - } - } } diff --git a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java index cba17e9bf7765..954689c91b3ae 100644 --- a/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java +++ b/presto-base-arrow-flight/src/main/java/com/facebook/plugin/arrow/ArrowPageSource.java @@ -85,6 +85,86 @@ public ArrowPageSource(ArrowSplit split, List columnHandles, getFlightStream(clientHandler, split.getTicket(), connectorSession); } + @Override + public long getCompletedBytes() + { + return 0; + } + + @Override + public long getCompletedPositions() + { + return currentPosition; + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public boolean isFinished() + { + return completed; + } + + @Override + public long getSystemMemoryUsage() + { + return 0; + } + + @Override + public Page getNextPage() + { + if (vectorSchemaRoot.isPresent()) { + vectorSchemaRoot.get().close(); + vectorSchemaRoot = Optional.empty(); + } + + if (flightStream.next()) { + vectorSchemaRoot = Optional.ofNullable(flightStream.getRoot()); + } + + if (!vectorSchemaRoot.isPresent()) { + completed = true; + } + + if (isFinished()) { + return null; + } + + currentPosition++; + + List blocks = new ArrayList<>(); + for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) { + FieldVector vector = vectorSchemaRoot.get().getVector(columnIndex); + Type type = columnHandles.get(columnIndex).getColumnType(); + + Block block = buildBlockFromVector(vector, type); + blocks.add(block); + } + + return new Page(vectorSchemaRoot.get().getRowCount(), blocks.toArray(new Block[0])); + } + + @Override + public void close() throws IOException + { + if (vectorSchemaRoot.isPresent()) { + vectorSchemaRoot.get().close(); + } + if (flightStream != null) { + try { + flightStream.close(); + } + catch (Exception e) { + logger.error(e); + } + } + } + private void getFlightStream(ArrowFlightClientHandler clientHandler, byte[] ticket, ConnectorSession connectorSession) { try { @@ -483,84 +563,4 @@ private Block buildBlockFromTimeStampSecVector(TimeStampSecVector vector, Type t } return builder.build(); } - - @Override - public long getCompletedBytes() - { - return 0; - } - - @Override - public long getCompletedPositions() - { - return currentPosition; - } - - @Override - public long getReadTimeNanos() - { - return 0; - } - - @Override - public boolean isFinished() - { - return completed; - } - - @Override - public long getSystemMemoryUsage() - { - return 0; - } - - @Override - public Page getNextPage() - { - if (vectorSchemaRoot.isPresent()) { - vectorSchemaRoot.get().close(); - vectorSchemaRoot = Optional.empty(); - } - - if (flightStream.next()) { - vectorSchemaRoot = Optional.ofNullable(flightStream.getRoot()); - } - - if (!vectorSchemaRoot.isPresent()) { - completed = true; - } - - if (isFinished()) { - return null; - } - - currentPosition++; - - List blocks = new ArrayList<>(); - for (int columnIndex = 0; columnIndex < columnHandles.size(); columnIndex++) { - FieldVector vector = vectorSchemaRoot.get().getVector(columnIndex); - Type type = columnHandles.get(columnIndex).getColumnType(); - - Block block = buildBlockFromVector(vector, type); - blocks.add(block); - } - - return new Page(vectorSchemaRoot.get().getRowCount(), blocks.toArray(new Block[0])); - } - - @Override - public void close() throws IOException - { - if (vectorSchemaRoot.isPresent()) { - vectorSchemaRoot.get().close(); - } - if (flightStream != null) { - try { - flightStream.close(); - } - catch (Exception e) { - logger.error(e); - } - } - } }