Skip to content

Commit

Permalink
#468 Take into account maximum string length when comparing schemas.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Aug 21, 2024
1 parent 393c4bc commit 69825e7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,19 @@ object SparkUtils {
* Compares 2 schemas.
*/
def compareSchemas(schema1: StructType, schema2: StructType): List[FieldChange] = {
def dataTypeToString(dt: DataType): String = {
def dataTypeToString(dt: DataType, metadata: Metadata): String = {
val maxLength = if (metadata.contains(MAX_LENGTH_METADATA_KEY)) {
Try {
metadata.getLong(MAX_LENGTH_METADATA_KEY).toInt
}.getOrElse(0)
} else {
0
}

dt match {
case _: StructType | _: ArrayType => dt.simpleString
case _ => dt.typeName
case _: StructType | _: ArrayType => dt.simpleString
case _: StringType if maxLength > 0 => s"varchar($maxLength)"
case _ => dt.typeName
}
}

Expand All @@ -137,22 +146,23 @@ object SparkUtils {

val newColumns: Array[FieldChange] = schema2.fields
.filter(f => !fields1.contains(f.name))
.map(f => FieldChange.NewField(f.name, dataTypeToString(f.dataType)))
.map(f => FieldChange.NewField(f.name, dataTypeToString(f.dataType, f.metadata)))

val deletedColumns: Array[FieldChange] = schema1.fields
.filter(f => !fields2.contains(f.name))
.map(f => FieldChange.DeletedField(f.name, dataTypeToString(f.dataType)))
.map(f => FieldChange.DeletedField(f.name, dataTypeToString(f.dataType, f.metadata)))

val changedType: Array[FieldChange] = schema1.fields
.filter(f => fields2.contains(f.name))
.flatMap(f => {
val dt1 = dataTypeToString(f.dataType)
val dt2 = dataTypeToString(fields2(f.name).dataType)
.flatMap(f1 => {
val dt1 = dataTypeToString(f1.dataType, f1.metadata)
val f2 = fields2(f1.name)
val dt2 = dataTypeToString(f2.dataType, f2.metadata)

if (dt1 == dt2) {
Seq.empty[FieldChange]
} else {
Seq(FieldChange.ChangedType(f.name, dt1, dt2))
Seq(FieldChange.ChangedType(f1.name, dt1, dt2))
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.tests.utils
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{DataFrame, Row, types}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.FieldChange._
import za.co.absa.pramen.core.NestedDataFrameFactory
Expand Down Expand Up @@ -251,6 +251,29 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
assert(diff.head.asInstanceOf[ChangedType].oldType == "integer")
assert(diff.head.asInstanceOf[ChangedType].newType == "double")
}

"detect changed string data types when metadata has changed " in {
val schema1Orig = exampleDf.schema

val metadata1 = (new MetadataBuilder).putLong(MAX_LENGTH_METADATA_KEY, 10L).build
val newField1 = schema1Orig.fields.head.copy(metadata = metadata1)
val schema1 = schema1Orig.copy(fields = newField1 +: schema1Orig.fields.tail)

val metadata2 = (new MetadataBuilder).putLong(MAX_LENGTH_METADATA_KEY, 15L).build
val newField2 = schema1Orig.fields.head.copy(metadata = metadata2)
val schema2 = schema1Orig.copy(fields = newField2 +: schema1Orig.fields.tail)

println(schema1.prettyJson)
println(schema2.prettyJson)

val diff = compareSchemas(schema1, schema2)

assert(diff.length == 1)
assert(diff.head.isInstanceOf[ChangedType])
assert(diff.head.asInstanceOf[ChangedType].columnName == "a")
assert(diff.head.asInstanceOf[ChangedType].oldType == "varchar(10)")
assert(diff.head.asInstanceOf[ChangedType].newType == "varchar(15)")
}
}

"applyTransformations" should {
Expand Down

0 comments on commit 69825e7

Please sign in to comment.