Skip to content

Commit

Permalink
Add support for UPDATE in iceberg
Browse files Browse the repository at this point in the history
This commit allows users to perform row-level updates when using
the Iceberg connector with Java-based workers.

This is achieved by improving on the IcebergUpdatablePageSource
to implement the updateRows method. The implementation passes
a  generated row ID column as a field in the page required by
updateRows. Then during updateRows, generated a positionDelete
file entry for the row ID, and also writes the row's updated value to a
new page sink for the newly updated data.

These new files are then commited in a rowDelta transaction within
the Iceberg connector metadata after processing is complete.

Co-Authored-By: Nidhin Varghese <[email protected]>
Co-Authored-By: Anoop V S <[email protected]>
  • Loading branch information
3 people committed Feb 7, 2025
1 parent 72be233 commit cd4df4f
Show file tree
Hide file tree
Showing 40 changed files with 1,183 additions and 227 deletions.
24 changes: 24 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,30 @@ For example, ``DESCRIBE`` from the partitioned Iceberg table ``customer``:
comment | varchar | |
(3 rows)

UPDATE
^^^^^^

The Iceberg connector supports :doc:`../sql/update` operations on Iceberg
tables. Only some tables support updates. These tables must be at minimum format
version 2, and the ``write.update.mode`` must be set to `merge-on-read`.

.. code-block:: sql

UPDATE region SET name = 'EU', comment = 'Europe' WHERE regionkey = 1;

.. code-block:: text

UPDATE: 1 row

Query 20250204_010341_00021_ymwi5, FINISHED, 2 nodes

The query returns an error if the table does not meet the requirements for
updates.

.. code-block:: text

Query 20250204_010445_00022_ymwi5 failed: Iceberg table updates require at least format version 2 and update mode must be merge-on-read

Schema Evolution
----------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public int hashCode()
@Override
public String toString()
{
return id + ":" + name;
return id + ":" + name + ":" + typeCategory + ":" + children;
}

public enum TypeCategory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class CommitTaskData
private final Optional<String> partitionDataJson;
private final FileFormat fileFormat;
private final Optional<String> referencedDataFile;
private final FileContent content;

@JsonCreator
public CommitTaskData(
Expand All @@ -38,7 +39,8 @@ public CommitTaskData(
@JsonProperty("partitionSpecJson") int partitionSpecId,
@JsonProperty("partitionDataJson") Optional<String> partitionDataJson,
@JsonProperty("fileFormat") FileFormat fileFormat,
@JsonProperty("referencedDataFile") String referencedDataFile)
@JsonProperty("referencedDataFile") String referencedDataFile,
@JsonProperty("content") FileContent content)
{
this.path = requireNonNull(path, "path is null");
this.fileSizeInBytes = fileSizeInBytes;
Expand All @@ -47,6 +49,7 @@ public CommitTaskData(
this.partitionDataJson = requireNonNull(partitionDataJson, "partitionDataJson is null");
this.fileFormat = requireNonNull(fileFormat, "fileFormat is null");
this.referencedDataFile = Optional.ofNullable(referencedDataFile);
this.content = requireNonNull(content, "content is null");
}

@JsonProperty
Expand Down Expand Up @@ -90,4 +93,10 @@ public Optional<String> getReferencedDataFile()
{
return referencedDataFile;
}

@JsonProperty
public FileContent getContent()
{
return content;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.spi.ConnectorPageSource;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class ConnectorPageSourceWithRowPositions
{
private final ConnectorPageSource delegate;
private final Optional<Long> startRowPosition;
private final Optional<Long> endRowPosition;

public ConnectorPageSourceWithRowPositions(
ConnectorPageSource delegate,
Optional<Long> startRowPosition,
Optional<Long> endRowPosition)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.startRowPosition = requireNonNull(startRowPosition, "startRowPosition is null");
this.endRowPosition = requireNonNull(endRowPosition, "endRowPosition is null");
}

public ConnectorPageSource getDelegate()
{
return delegate;
}

public Optional<Long> getStartRowPosition()
{
return startRowPosition;
}

public Optional<Long> getEndRowPosition()
{
return endRowPosition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,23 @@ public static FileFormat fromIcebergFileFormat(org.apache.iceberg.FileFormat for

return prestoFileFormat;
}

public org.apache.iceberg.FileFormat toIceberg()
{
org.apache.iceberg.FileFormat fileFormat;
switch (this) {
case ORC:
fileFormat = org.apache.iceberg.FileFormat.ORC;
break;
case PARQUET:
fileFormat = org.apache.iceberg.FileFormat.PARQUET;
break;
case AVRO:
fileFormat = org.apache.iceberg.FileFormat.AVRO;
break;
default:
throw new PrestoException(NOT_SUPPORTED, "Unsupported file format: " + this);
}
return fileFormat;
}
}
Loading

0 comments on commit cd4df4f

Please sign in to comment.