Skip to content

Commit

Permalink
Fixes an upgrade compatibility breakage for the BQ write transform (a…
Browse files Browse the repository at this point in the history
…pache#30032)

* Fixes an upgrade compatibility breakage for the BQ write transform

* Addressing reviewer comments

* Resolve a conflcit
  • Loading branch information
chamikaramj authored Jan 23, 2024
1 parent 0a813b9 commit 93339bc
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -609,7 +610,7 @@ default Row toConfigRow(T transform) {
* {@link #toConfigRow(PTransform)} method.
* @return a transform represented by the current {@code TransformPayloadTranslator}.
*/
default T fromConfigRow(Row configRow) {
default T fromConfigRow(Row configRow, PipelineOptions options) {
throw new UnsupportedOperationException("Not implemented");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,13 @@ public void visitPrimitiveTransform(Node node) {
res = elideDeprecatedViews(res);
}

ExternalTranslationOptions externalTranslationOptions =
pipeline.getOptions().as(ExternalTranslationOptions.class);
List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride();
List<String> urnsToOverride =
pipeline.getOptions().as(ExternalTranslationOptions.class).getTransformsToOverride();
if (urnsToOverride.size() > 0 && upgradeTransforms) {
try (TransformUpgrader upgrader = TransformUpgrader.of()) {
res =
upgrader.upgradeTransformsViaTransformService(
res, urnsToOverride, externalTranslationOptions);
res, urnsToOverride, pipeline.getOptions());
} catch (Exception e) {
throw new RuntimeException(
"Could not override the transforms with URNs " + urnsToOverride, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,11 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transformservice.launcher.TransformServiceLauncher;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
Expand Down Expand Up @@ -97,7 +100,7 @@ static TransformUpgrader of(ExpansionServiceClientFactory clientFactory) {
* @throws Exception
*/
public RunnerApi.Pipeline upgradeTransformsViaTransformService(
RunnerApi.Pipeline pipeline, List<String> urnsToOverride, ExternalTranslationOptions options)
RunnerApi.Pipeline pipeline, List<String> urnsToOverride, PipelineOptions options)
throws IOException, TimeoutException {
List<String> transformsToOverride =
pipeline.getComponents().getTransformsMap().entrySet().stream()
Expand Down Expand Up @@ -127,13 +130,15 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService(
String serviceAddress;
TransformServiceLauncher service = null;

if (options.getTransformServiceAddress() != null) {
serviceAddress = options.getTransformServiceAddress();
} else if (options.getTransformServiceBeamVersion() != null) {
ExternalTranslationOptions externalTranslationOptions =
options.as(ExternalTranslationOptions.class);
if (externalTranslationOptions.getTransformServiceAddress() != null) {
serviceAddress = externalTranslationOptions.getTransformServiceAddress();
} else if (externalTranslationOptions.getTransformServiceBeamVersion() != null) {
String projectName = UUID.randomUUID().toString();
int port = findAvailablePort();
service = TransformServiceLauncher.forProject(projectName, port, null);
service.setBeamVersion(options.getTransformServiceBeamVersion());
service.setBeamVersion(externalTranslationOptions.getTransformServiceBeamVersion());

// Starting the transform service.
service.start();
Expand Down Expand Up @@ -169,7 +174,7 @@ RunnerApi.Pipeline updateTransformViaTransformService(
RunnerApi.Pipeline runnerAPIpipeline,
String transformId,
Endpoints.ApiServiceDescriptor transformServiceEndpoint,
ExternalTranslationOptions options)
PipelineOptions options)
throws IOException {
RunnerApi.PTransform transformToUpgrade =
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId);
Expand Down Expand Up @@ -207,11 +212,26 @@ RunnerApi.Pipeline updateTransformViaTransformService(

ExpansionApi.ExpansionRequest.Builder requestBuilder =
ExpansionApi.ExpansionRequest.newBuilder();

// Creating a clone here so that we can set properties without modifying the original
// PipelineOptions object.
PipelineOptions optionsClone =
PipelineOptionsTranslation.fromProto(PipelineOptionsTranslation.toProto(options));
String updateCompatibilityVersion =
optionsClone.as(StreamingOptions.class).getUpdateCompatibilityVersion();
if (updateCompatibilityVersion == null || updateCompatibilityVersion.isEmpty()) {
// Setting the option 'updateCompatibilityVersion' to the current SDK version so that the
// TransformService uses a compatible schema.
optionsClone
.as(StreamingOptions.class)
.setUpdateCompatibilityVersion(ReleaseInfo.getReleaseInfo().getSdkVersion());
}
ExpansionApi.ExpansionRequest request =
requestBuilder
.setComponents(runnerAPIpipeline.getComponents())
.setTransform(ptransformBuilder.build())
.setNamespace(UPGRADE_NAMESPACE)
.setPipelineOptions(PipelineOptionsTranslation.toProto(optionsClone))
.addAllRequirements(runnerAPIpipeline.getRequirementsList())
.build();

Expand Down Expand Up @@ -242,7 +262,8 @@ RunnerApi.Pipeline updateTransformViaTransformService(

// Adds an annotation that denotes the Beam version the transform was upgraded to.
RunnerApi.PTransform.Builder expandedTransformBuilder = expandedTransform.toBuilder();
String transformServiceVersion = options.getTransformServiceBeamVersion();
String transformServiceVersion =
options.as(ExternalTranslationOptions.class).getTransformServiceBeamVersion();
if (transformServiceVersion == null || transformServiceVersion.isEmpty()) {
transformServiceVersion = "unknown";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -93,7 +94,7 @@ public String getUrn() {
}

@Override
public TestTransform fromConfigRow(Row configRow) {
public TestTransform fromConfigRow(Row configRow, PipelineOptions options) {
return new TestTransform(configRow.getInt32("multiplier"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public Map<String, ExpansionService.TransformProvider> knownTransforms() {
TransformProvider transformProvider =
new TransformProvider() {
@Override
public PTransform getTransform(RunnerApi.FunctionSpec spec) {
public PTransform getTransform(
RunnerApi.FunctionSpec spec, PipelineOptions options) {
try {
Class configClass = getConfigClass(builderInstance);
return builderInstance.buildExternal(
Expand Down Expand Up @@ -222,14 +223,14 @@ public List<String> getDependencies(
}
final String finalUrn = urn;
TransformProvider transformProvider =
spec -> {
(spec, options) -> {
try {
ExternalConfigurationPayload payload =
ExternalConfigurationPayload.parseFrom(spec.getPayload());
Row configRow =
RowCoder.of(SchemaTranslation.schemaFromProto(payload.getSchema()))
.decode(new ByteArrayInputStream(payload.getPayload().toByteArray()));
PTransform transformFromRow = translator.fromConfigRow(configRow);
PTransform transformFromRow = translator.fromConfigRow(configRow, options);
if (transformFromRow != null) {
return transformFromRow;
} else {
Expand Down Expand Up @@ -441,7 +442,7 @@ default InputT createInput(Pipeline p, Map<String, PCollection<?>> inputs) {
}
}

PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec);
PTransform<InputT, OutputT> getTransform(RunnerApi.FunctionSpec spec, PipelineOptions options);

default Map<String, PCollection<?>> extractOutputs(OutputT output) {
if (output instanceof PDone) {
Expand Down Expand Up @@ -485,7 +486,8 @@ default Map<String, PCollection<?>> extractOutputs(OutputT output) {
default Map<String, PCollection<?>> apply(
Pipeline p, String name, RunnerApi.FunctionSpec spec, Map<String, PCollection<?>> inputs) {
return extractOutputs(
Pipeline.applyTransform(name, createInput(p, inputs), getTransform(spec)));
Pipeline.applyTransform(
name, createInput(p, inputs), getTransform(spec, p.getOptions())));
}

default String getTransformUniqueID(RunnerApi.FunctionSpec spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -91,7 +92,7 @@ public Map<String, PCollection<?>> extractOutputs(PCollectionRowTuple output) {
}

@Override
public PTransform getTransform(FunctionSpec spec) {
public PTransform getTransform(FunctionSpec spec, PipelineOptions options) {
SchemaTransformPayload payload;
try {
payload = SchemaTransformPayload.parseFrom(spec.getPayload());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.repackaged.core.org.apache.commons.lang3.ClassUtils;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.expansion.service.ExpansionService.TransformProvider;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -90,7 +91,7 @@ public JavaClassLookupTransformProvider(AllowList allowList) {

@SuppressWarnings("argument")
@Override
public PTransform<PInput, POutput> getTransform(FunctionSpec spec) {
public PTransform<PInput, POutput> getTransform(FunctionSpec spec, PipelineOptions options) {
JavaClassLookupPayload payload;
try {
payload = JavaClassLookupPayload.parseFrom(spec.getPayload());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand Down Expand Up @@ -441,10 +442,13 @@ public void testSchematransformEquivalentConfigSchema() throws CoderException {
assertNotEquals(spec.getPayload(), equivalentSpec.getPayload());

TestSchemaTransform transform =
(TestSchemaTransform) ExpansionServiceSchemaTransformProvider.of().getTransform(spec);
(TestSchemaTransform)
ExpansionServiceSchemaTransformProvider.of()
.getTransform(spec, PipelineOptionsFactory.create());
TestSchemaTransform equivalentTransform =
(TestSchemaTransform)
ExpansionServiceSchemaTransformProvider.of().getTransform(equivalentSpec);
ExpansionServiceSchemaTransformProvider.of()
.getTransform(equivalentSpec, PipelineOptionsFactory.create());

assertEquals(transform.int1, equivalentTransform.int1);
assertEquals(transform.int2, equivalentTransform.int2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static class TestTransformRegistrar implements ExpansionService.Expansion

@Override
public Map<String, ExpansionService.TransformProvider> knownTransforms() {
return ImmutableMap.of(TEST_URN, spec -> Count.perElement());
return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,23 @@ public static class TestTransforms
public Map<String, ExpansionService.TransformProvider> knownTransforms() {
return ImmutableMap.of(
TEST_URN_SIMPLE,
spec -> MapElements.into(TypeDescriptors.strings()).via((String x) -> x + x),
(spec, options) -> MapElements.into(TypeDescriptors.strings()).via((String x) -> x + x),
TEST_URN_LE,
spec -> Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())),
(spec, options) -> Filter.lessThanEq(Integer.parseInt(spec.getPayload().toStringUtf8())),
TEST_URN_MULTI,
spec ->
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element() % 2 == 0) {
c.output(c.element());
} else {
c.output(odd, c.element());
}
}
})
.withOutputTags(even, TupleTagList.of(odd)));
(spec, options) ->
ParDo.of(
new DoFn<Integer, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
if (c.element() % 2 == 0) {
c.output(c.element());
} else {
c.output(odd, c.element());
}
}
})
.withOutputTags(even, TupleTagList.of(odd)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.RowWriterFactory.AvroRowWriterFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -188,7 +190,7 @@ public Row toConfigRow(TypedRead<?> transform) {
}

@Override
public TypedRead<?> fromConfigRow(Row configRow) {
public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
try {
BigQueryIO.TypedRead.Builder builder = new AutoValue_BigQueryIO_TypedRead.Builder<>();

Expand Down Expand Up @@ -552,7 +554,7 @@ public Row toConfigRow(Write<?> transform) {
}

@Override
public Write<?> fromConfigRow(Row configRow) {
public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
try {
BigQueryIO.Write.Builder builder = new AutoValue_BigQueryIO_Write.Builder<>();

Expand Down Expand Up @@ -695,7 +697,25 @@ public Write<?> fromConfigRow(Row configRow) {
if (maxBytesPerPartition != null) {
builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
}
Duration triggeringFrequency = configRow.getValue("triggering_frequency");

String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();

// We need to update the 'triggerring_frequency' field name for pipelines that are upgraded
// from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.53.0.
// Both above issues are fixed for Beam 2.54.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";

String triggeringFrequencyFieldName =
(updateCompatibilityBeamVersion != null
&& updateCompatibilityBeamVersion.equals("2.53.0"))
? "triggerring_frequency"
: "triggering_frequency";

Duration triggeringFrequency = configRow.getValue(triggeringFrequencyFieldName);
if (triggeringFrequency != null) {
builder =
builder.setTriggeringFrequency(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -142,7 +145,8 @@ public void testReCreateReadTransformFromRowTable() {
Row row = translator.toConfigRow(readTransform);

BigQueryIO.TypedRead<TableRow> readTransformFromRow =
(BigQueryIO.TypedRead<TableRow>) translator.fromConfigRow(row);
(BigQueryIO.TypedRead<TableRow>)
translator.fromConfigRow(row, PipelineOptionsFactory.create());
assertNotNull(readTransformFromRow.getTable());
assertEquals("dummyproject", readTransformFromRow.getTable().getProjectId());
assertEquals("dummydataset", readTransformFromRow.getTable().getDatasetId());
Expand Down Expand Up @@ -172,7 +176,8 @@ public void testReCreateReadTransformFromRowQuery() {
new BigQueryIOTranslation.BigQueryIOReadTranslator();
Row row = translator.toConfigRow(readTransform);

BigQueryIO.TypedRead<?> readTransformFromRow = translator.fromConfigRow(row);
BigQueryIO.TypedRead<?> readTransformFromRow =
translator.fromConfigRow(row, PipelineOptionsFactory.create());
assertEquals("dummyquery", readTransformFromRow.getQuery().get());
assertNotNull(readTransformFromRow.getParseFn());
assertTrue(readTransformFromRow.getParseFn() instanceof DummyParseFn);
Expand Down Expand Up @@ -241,7 +246,10 @@ public void testReCreateWriteTransformFromRowTable() {
new BigQueryIOTranslation.BigQueryIOWriteTranslator();
Row row = translator.toConfigRow(writeTransform);

BigQueryIO.Write<?> writeTransformFromRow = (BigQueryIO.Write<?>) translator.fromConfigRow(row);
PipelineOptions options = PipelineOptionsFactory.create();
options.as(StreamingOptions.class).setUpdateCompatibilityVersion("2.54.0");
BigQueryIO.Write<?> writeTransformFromRow =
(BigQueryIO.Write<?>) translator.fromConfigRow(row, options);
assertNotNull(writeTransformFromRow.getTable());
assertEquals("dummyproject", writeTransformFromRow.getTable().get().getProjectId());
assertEquals("dummydataset", writeTransformFromRow.getTable().get().getDatasetId());
Expand Down
Loading

0 comments on commit 93339bc

Please sign in to comment.