Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update begin delete #24528

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions presto-docs/src/main/sphinx/develop/delete-and-update.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ A connector implementing ``DELETE`` must specify three ``ConnectorMetadata`` met

* ``beginDelete()``::

ConnectorTableHandle beginDelete(
ConnectorDeleteTableHandle beginDelete(
ConnectorSession session,
ConnectorTableHandle tableHandle)

Expand All @@ -116,15 +116,15 @@ A connector implementing ``DELETE`` must specify three ``ConnectorMetadata`` met
``beginDelete()`` performs any orchestration needed in the connector to start processing the ``DELETE``.
This orchestration varies from connector to connector.

``beginDelete()`` returns a ``ConnectorTableHandle`` with any added information the connector needs when the handle
``beginDelete()`` returns a ``ConnectorDeleteTableHandle`` with any added information the connector needs when the handle
is passed back to ``finishDelete()`` and the split generation machinery. For most connectors, the returned table
handle contains a flag identifying the table handle as a table handle for a ``DELETE`` operation.

* ``finishDelete()``::

void finishDelete(
ConnectorSession session,
ConnectorTableHandle tableHandle,
ConnectoDeleteTableHandle tableHandle,
Collection<Slice> fragments)

During ``DELETE`` processing, the Presto engine accumulates the ``Slice`` collections returned by ``UpdatablePageSource.finish()``.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.facebook.presto.hive.statistics.HiveStatisticsProvider;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorMetadataUpdateHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
Expand Down Expand Up @@ -2517,7 +2518,7 @@ public Optional<List<SchemaTableName>> getReferencedMaterializedViews(ConnectorS
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.facebook.presto.iceberg.statistics.StatisticsFileCache;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -856,7 +857,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand All @@ -869,19 +870,17 @@ public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTable
if (formatVersion < MIN_FORMAT_VERSION_FOR_DELETE) {
throw new PrestoException(NOT_SUPPORTED, format("This connector only supports delete where one or more partitions are deleted entirely for table versions older than %d", MIN_FORMAT_VERSION_FOR_DELETE));
}

if (getDeleteMode(icebergTable) == RowLevelOperationMode.COPY_ON_WRITE) {
throw new PrestoException(NOT_SUPPORTED, "This connector only supports delete where one or more partitions are deleted entirely. Configure delete_mode table property to allow row level deletions.");
}

validateTableMode(session, icebergTable);
transaction = icebergTable.newTransaction();

return handle;
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(ConnectorSession session, ConnectorDeleteTableHandle tableHandle, Collection<Slice> fragments)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getIcebergTable(session, handle.getSchemaTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.hive.HiveTransactionHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -62,6 +63,12 @@ public Class<? extends ConnectorInsertTableHandle> getInsertTableHandleClass()
return IcebergInsertTableHandle.class;
}

@Override
public Class<? extends ConnectorDeleteTableHandle> getDeleteTableHandleClass()
{
return IcebergTableHandle.class;
}

@Override
public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.iceberg;

import com.facebook.presto.hive.BaseHiveTableHandle;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

Expand All @@ -26,6 +27,7 @@

public class IcebergTableHandle
extends BaseHiveTableHandle
implements ConnectorDeleteTableHandle
{
private final IcebergTableName icebergTableName;
private final boolean snapshotSpecified;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.kudu.properties.PartitionDesign;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorNewTableLayout;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
Expand Down Expand Up @@ -357,13 +358,13 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
}

@Override
public ConnectorTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
public ConnectorDeleteTableHandle beginDelete(ConnectorSession session, ConnectorTableHandle tableHandle)
{
return tableHandle;
return null;
}

@Override
public void finishDelete(ConnectorSession session, ConnectorTableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(ConnectorSession session, ConnectorDeleteTableHandle tableHandle, Collection<Slice> fragments)
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.facebook.presto.execution.scheduler;

import com.facebook.presto.metadata.DeleteTableHandle;
import com.facebook.presto.metadata.InsertTableHandle;
import com.facebook.presto.metadata.OutputTableHandle;
import com.facebook.presto.spi.SchemaTableName;
Expand Down Expand Up @@ -106,20 +107,20 @@ public String toString()
public static class DeleteHandle
extends ExecutionWriterTarget
{
private final TableHandle handle;
private final DeleteTableHandle handle;
private final SchemaTableName schemaTableName;

@JsonCreator
public DeleteHandle(
@JsonProperty("handle") TableHandle handle,
@JsonProperty("handle") DeleteTableHandle handle,
@JsonProperty("schemaTableName") SchemaTableName schemaTableName)
{
this.handle = requireNonNull(handle, "handle is null");
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
}

@JsonProperty
public TableHandle getHandle()
public DeleteTableHandle getHandle()
{
return handle;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.Session;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.AnalyzeTableHandle;
import com.facebook.presto.metadata.DeleteTableHandle;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableLayoutResult;
import com.facebook.presto.spi.ColumnHandle;
Expand Down Expand Up @@ -157,12 +158,15 @@ private static Optional<DeleteScanInfo> createDeleteScanInfo(PlanNode planNode,

private static Optional<DeleteScanInfo> createDeleteScanInfo(DeleteNode delete, Optional<ExecutionWriterTarget> writerTarget, Metadata metadata, Session session)
{
TableHandle tableHandle = ((ExecutionWriterTarget.DeleteHandle) writerTarget.get()).getHandle();
/* Metadata.getLayout does not accept DeleteTableHandle
* DeleteScanInfo should stay with TableHandle type so the effect does not continue to ripple out
*/
DeleteTableHandle tableHandle = ((ExecutionWriterTarget.DeleteHandle) writerTarget.get()).getHandle(); //unused in placeholder fix
TableScanNode tableScan = getDeleteTableScan(delete);
TupleDomain<ColumnHandle> originalEnforcedConstraint = tableScan.getEnforcedConstraint();
TableLayoutResult layoutResult = metadata.getLayout(
session,
tableHandle,
tableScan.getTable(), //TODO: untested placeholder fix, please check for discrepancies in output
new Constraint<>(originalEnforcedConstraint),
Optional.of(ImmutableSet.copyOf(tableScan.getAssignments().values())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,13 @@ public OptionalLong metadataDelete(Session session, TableHandle tableHandle)
}

@Override
public TableHandle beginDelete(Session session, TableHandle tableHandle)
public DeleteTableHandle beginDelete(Session session, TableHandle tableHandle)
{
return delegate.beginDelete(session, tableHandle);
}

@Override
public void finishDelete(Session session, TableHandle tableHandle, Collection<Slice> fragments)
public void finishDelete(Session session, DeleteTableHandle tableHandle, Collection<Slice> fragments)
{
delegate.finishDelete(session, tableHandle, fragments);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDeleteTableHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

import static java.util.Objects.requireNonNull;

public final class DeleteTableHandle
{
private final ConnectorId connectorId;
private final ConnectorTransactionHandle transactionHandle;
private final ConnectorDeleteTableHandle connectorHandle;

@JsonCreator
public DeleteTableHandle(
@JsonProperty("connectorId") ConnectorId connectorId,
@JsonProperty("transactionHandle") ConnectorTransactionHandle transactionHandle,
@JsonProperty("connectorHandle") ConnectorDeleteTableHandle connectorHandle)
{
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
}

@JsonProperty
public ConnectorId getConnectorId()
{
return connectorId;
}

@JsonProperty
public ConnectorTransactionHandle getTransactionHandle()
{
return transactionHandle;
}

@JsonProperty
public ConnectorDeleteTableHandle getConnectorHandle()
{
return connectorHandle;
}

@Override
public int hashCode()
{
return Objects.hash(connectorId, transactionHandle, connectorHandle);
}

@Override
public boolean equals(Object obj)
{
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
DeleteTableHandle o = (DeleteTableHandle) obj;
return Objects.equals(this.connectorId, o.connectorId) &&
Objects.equals(this.transactionHandle, o.transactionHandle) &&
Objects.equals(this.connectorHandle, o.connectorHandle);
}

@Override
public String toString()
{
return connectorId + ":" + connectorHandle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.metadata;

import com.facebook.presto.spi.ConnectorDeleteTableHandle;

import javax.inject.Inject;

public class DeleteTableHandleJacksonModule
extends AbstractTypedJacksonModule<ConnectorDeleteTableHandle>
{
@Inject
public DeleteTableHandleJacksonModule(HandleResolver handleResolver)
{
super(ConnectorDeleteTableHandle.class,
handleResolver::getId,
handleResolver::getDeleteTableHandleClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public void configure(Binder binder)
jsonBinder(binder).addModuleBinding().to(SplitJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(OutputTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(InsertTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(DeleteTableHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(IndexHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(TransactionHandleJacksonModule.class);
jsonBinder(binder).addModuleBinding().to(PartitioningHandleJacksonModule.class);
Expand Down
Loading
Loading