From d4a2e3daa39bfa35d5513794e67507d015e6ae5f Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 10 Jan 2025 08:36:37 +0100 Subject: [PATCH] #534 Allow specifying the explicit number of partitions for metastore tables. --- .../za/co/absa/pramen/api/DataFormat.scala | 4 +- .../za/co/absa/pramen/api/PartitionInfo.scala | 28 +++++++++++ .../pramen/core/metastore/MetastoreImpl.scala | 1 - .../metastore/model/DataFormatParser.scala | 33 ++++++++++--- .../peristence/MetastorePersistence.scala | 8 ++-- .../MetastorePersistenceDelta.scala | 23 ++------- .../MetastorePersistenceParquet.scala | 39 ++++++++------- .../pipeline/PythonTransformationJob.scala | 18 +++++-- .../metastore/model/DataFormatSuite.scala | 47 ++++++++++++++++--- .../metastore/model/HiveConfigSuite.scala | 6 +-- .../core/metastore/model/MetaTableSuite.scala | 3 +- .../MetastorePersistenceParquetSuite.scala | 9 ++-- .../MetastorePersistenceSuite.scala | 34 +++++++++++--- .../pramen/core/mocks/MetaTableFactory.scala | 2 +- .../absa/pramen/core/mocks/job/JobSpy.scala | 2 +- .../core/mocks/metastore/MetastoreSpy.scala | 7 +-- .../core/mocks/sink/CmdLineSinkSuite.scala | 2 +- .../PythonTransformationJobSuite.scala | 19 ++++---- .../core/source/SourceValidationSuite.scala | 2 +- .../tests/utils/hive/HiveHelperSuite.scala | 2 +- .../ConversionTransformerSuite.scala | 2 +- .../EcsNotificationTargetSuite.scala | 3 +- .../EcsPipelineNotificationTargetSuite.scala | 6 +-- 23 files changed, 198 insertions(+), 102 deletions(-) create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala index 895acec35..71a6b08ee 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/DataFormat.scala @@ -28,7 +28,7 @@ sealed trait DataFormat { } object DataFormat { - case class Parquet(path: String, recordsPerPartition: Option[Long]) extends DataFormat { + case class Parquet(path: String, partitionInfo: PartitionInfo = PartitionInfo.Default) extends DataFormat { override def name: String = "parquet" override val isTransient: Boolean = false @@ -38,7 +38,7 @@ object DataFormat { override val isRaw: Boolean = false } - case class Delta(query: Query, recordsPerPartition: Option[Long]) extends DataFormat { + case class Delta(query: Query, partitionInfo: PartitionInfo = PartitionInfo.Default) extends DataFormat { override def name: String = "delta" override val isTransient: Boolean = false diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala new file mode 100644 index 000000000..e0947751d --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api + +trait PartitionInfo + +object PartitionInfo { + case object Default extends PartitionInfo + + case class Explicit(numberOfPartitions: Int) extends PartitionInfo + + case class PerRecordCount(recordsPerPartition: Long) extends PartitionInfo + +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index cc704062f..9ab84a6d2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -287,7 +287,6 @@ object MetastoreImpl { private val log = LoggerFactory.getLogger(this.getClass) val METASTORE_KEY = "pramen.metastore.tables" - val DEFAULT_RECORDS_PER_PARTITION = 500000 def fromConfig(conf: Config, runtimeConfig: RuntimeConfig, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala index fe00872c4..e17add83e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.metastore.model import com.typesafe.config.Config -import za.co.absa.pramen.api.{CachePolicy, DataFormat, Query} +import za.co.absa.pramen.api.{CachePolicy, DataFormat, PartitionInfo, Query} import za.co.absa.pramen.core.utils.ConfigUtils object DataFormatParser { @@ -30,6 +30,7 @@ object DataFormatParser { val FORMAT_KEY = "format" val PATH_KEY = "path" val TABLE_KEY = "table" + val NUMBER_OF_PARTITIONS_KEY = "number.of.partitions" val RECORDS_PER_PARTITION_KEY = "records.per.partition" val CACHE_POLICY_KEY = "cache.policy" val DEFAULT_FORMAT = "parquet" @@ -44,14 +45,12 @@ object DataFormatParser { format match { case FORMAT_PARQUET => val path = getPath(conf) - val recordsPerPartition = ConfigUtils.getOptionLong(conf, RECORDS_PER_PARTITION_KEY) - .orElse(defaultRecordsPerPartition) - DataFormat.Parquet(path, recordsPerPartition) + val partitionInfo = getPartitionInfo(conf, defaultRecordsPerPartition) + DataFormat.Parquet(path, partitionInfo) case FORMAT_DELTA => val query = getQuery(conf) - val recordsPerPartition = ConfigUtils.getOptionLong(conf, RECORDS_PER_PARTITION_KEY) - .orElse(defaultRecordsPerPartition) - DataFormat.Delta(query, recordsPerPartition) + val partitionInfo = getPartitionInfo(conf, defaultRecordsPerPartition) + DataFormat.Delta(query, partitionInfo) case FORMAT_RAW => if (!conf.hasPath(PATH_KEY)) throw new IllegalArgumentException(s"Mandatory option for a metastore table having 'raw' format: $PATH_KEY") val path = Query.Path(conf.getString(PATH_KEY)).path @@ -66,6 +65,26 @@ object DataFormatParser { } } + private[core] def getPartitionInfo(conf: Config, defaultRecordsPerPartition: Option[Long]): PartitionInfo = { + val numberOfPartitionsOpt = ConfigUtils.getOptionInt(conf, NUMBER_OF_PARTITIONS_KEY) + val recordsPerPartitionOpt = ConfigUtils.getOptionLong(conf, RECORDS_PER_PARTITION_KEY) + + (numberOfPartitionsOpt, recordsPerPartitionOpt) match { + case (Some(_), Some(_)) => + throw new IllegalArgumentException( + s"Both '$NUMBER_OF_PARTITIONS_KEY' and '$RECORDS_PER_PARTITION_KEY' are specified. Please specify only one of those options.") + case (Some(nop), None) => + PartitionInfo.Explicit(nop) + case (None, Some(rpp)) => + PartitionInfo.PerRecordCount(rpp) + case (None, None) => + defaultRecordsPerPartition match { + case Some(rpp) => PartitionInfo.PerRecordCount(rpp) + case None => PartitionInfo.Default + } + } + } + private[core] def getCachePolicy(conf: Config): Option[CachePolicy] = { if (conf.hasPath(CACHE_POLICY_KEY)) { conf.getString(CACHE_POLICY_KEY).trim.toLowerCase match { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala index 93bde1b65..f63ba6ba0 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala @@ -47,13 +47,13 @@ object MetastorePersistence { val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt) metaTable.format match { - case DataFormat.Parquet(path, recordsPerPartition) => + case DataFormat.Parquet(path, partitionInfo) => new MetastorePersistenceParquet( - path, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions + path, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, partitionInfo, saveModeOpt, metaTable.readOptions, metaTable.writeOptions ) - case DataFormat.Delta(query, recordsPerPartition) => + case DataFormat.Delta(query, partitionInfo) => new MetastorePersistenceDelta( - query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, metaTable.partitionByInfoDate, recordsPerPartition, saveModeOpt, metaTable.readOptions, metaTable.writeOptions + query, metaTable.infoDateColumn, metaTable.infoDateFormat, metaTable.batchIdColumn, batchId, metaTable.partitionByInfoDate, partitionInfo, saveModeOpt, metaTable.readOptions, metaTable.writeOptions ) case DataFormat.Raw(path) => new MetastorePersistenceRaw(path, metaTable.infoDateColumn, metaTable.infoDateFormat, saveModeOpt) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala index faa54ee94..5974dc6e8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala @@ -21,9 +21,10 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.DateType import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.Query +import za.co.absa.pramen.api.{PartitionInfo, Query} import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.HiveConfig +import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceParquet.applyPartitioning import za.co.absa.pramen.core.utils.Emoji.SUCCESS import za.co.absa.pramen.core.utils.hive.QueryExecutor import za.co.absa.pramen.core.utils.{FsUtils, StringUtils} @@ -38,7 +39,7 @@ class MetastorePersistenceDelta(query: Query, batchIdColumn: String, batchId: Long, partitionByInfoDate: Boolean, - recordsPerPartition: Option[Long], + partitionInfo: PartitionInfo, saveModeOpt: Option[SaveMode], readOptions: Map[String, String], writeOptions: Map[String, String] @@ -67,13 +68,8 @@ class MetastorePersistenceDelta(query: Query, val whereCondition = s"$infoDateColumn='$infoDateStr'" - val dfRepartitioned = if (partitionByInfoDate && recordsPerPartition.nonEmpty) { - val recordCount = numberOfRecordsEstimate match { - case Some(count) => count - case None => df.count() - } - - applyRepartitioning(df, recordCount) + val dfRepartitioned = if (partitionByInfoDate) { + applyPartitioning(df, partitionInfo, numberOfRecordsEstimate) } else { df } @@ -206,15 +202,6 @@ class MetastorePersistenceDelta(query: Query, } } - def applyRepartitioning(dfIn: DataFrame, recordCount: Long): DataFrame = { - recordsPerPartition match { - case None => dfIn - case Some(rpp) => - val numPartitions = Math.max(1, Math.ceil(recordCount.toDouble / rpp)).toInt - dfIn.repartition(numPartitions) - } - } - private def getPartitionPath(infoDate: LocalDate): Path = { val partition = s"$infoDateColumn=${dateFormatter.format(infoDate)}" new Path(query.query, partition) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala index e5e5fd40f..396b984cf 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions._ import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession} import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.PartitionInfo import za.co.absa.pramen.core.config.Keys import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.HiveConfig @@ -38,12 +39,14 @@ class MetastorePersistenceParquet(path: String, infoDateFormat: String, batchIdColumn: String, batchId: Long, - recordsPerPartition: Option[Long], + partitionInfo: PartitionInfo, saveModeOpt: Option[SaveMode], readOptions: Map[String, String], writeOptions: Map[String, String] )(implicit spark: SparkSession) extends MetastorePersistence { + import MetastorePersistenceParquet._ + private val log = LoggerFactory.getLogger(this.getClass) private val dateFormatter = DateTimeFormatter.ofPattern(infoDateFormat) @@ -83,16 +86,7 @@ class MetastorePersistenceParquet(path: String, df } - val dfRepartitioned = if (recordsPerPartition.nonEmpty) { - val recordCount = numberOfRecordsEstimate match { - case Some(count) => count - case None => dfIn.count() - } - - applyRepartitioning(dfIn, recordCount) - } else { - dfIn - } + val dfRepartitioned = applyPartitioning(dfIn, partitionInfo, numberOfRecordsEstimate) writeAndCleanOnFailure(dfRepartitioned, outputDirStr, fsUtils, saveMode) @@ -191,15 +185,6 @@ class MetastorePersistenceParquet(path: String, } } - def applyRepartitioning(dfIn: DataFrame, recordCount: Long): DataFrame = { - recordsPerPartition match { - case None => dfIn - case Some(rpp) => - val numPartitions = Math.max(1, Math.ceil(recordCount.toDouble / rpp)).toInt - dfIn.repartition(numPartitions) - } - } - private[core] def writeAndCleanOnFailure(df: DataFrame, outputDirStr: String, fsUtils: FsUtils, @@ -231,3 +216,17 @@ class MetastorePersistenceParquet(path: String, } } } + +object MetastorePersistenceParquet { + def applyPartitioning(dfIn: DataFrame, partitionInfo: PartitionInfo, recordCountEstimate: Option[Long]): DataFrame = { + partitionInfo match { + case PartitionInfo.Default => dfIn + case PartitionInfo.Explicit(nop) => + dfIn.coalesce(nop) + case PartitionInfo.PerRecordCount(rpp) => + val recordCount = recordCountEstimate.getOrElse(dfIn.count()) + val numPartitions = Math.max(1, Math.ceil(recordCount.toDouble / rpp)).toInt + dfIn.repartition(numPartitions) + } + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala index 66d907b46..0c355dc54 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala @@ -19,13 +19,12 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} -import za.co.absa.pramen.api.{DataFormat, Reason} +import za.co.absa.pramen.api.{DataFormat, PartitionInfo, Reason} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.databricks.{DatabricksClient, PramenPyJobTemplate} import za.co.absa.pramen.core.exceptions.ProcessFailedException import za.co.absa.pramen.core.metastore.Metastore -import za.co.absa.pramen.core.metastore.MetastoreImpl.DEFAULT_RECORDS_PER_PARTITION import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.pipeline.PythonTransformationJob._ import za.co.absa.pramen.core.process.ProcessRunner @@ -250,8 +249,8 @@ class PythonTransformationJob(operationDef: OperationDef, def getTable(mt: MetaTable): String = { val description = if (mt.description.isEmpty) "" else s"\n description: ${escapeString(mt.description)}" val recordsPerPartition = mt.format match { - case f: DataFormat.Parquet => s"\n records_per_partition: ${f.recordsPerPartition.getOrElse(DEFAULT_RECORDS_PER_PARTITION)}" - case f: DataFormat.Delta => s"\n records_per_partition: ${f.recordsPerPartition.getOrElse(DEFAULT_RECORDS_PER_PARTITION)}" + case f: DataFormat.Parquet => getPartitionJaml(f.partitionInfo) + case f: DataFormat.Delta => getPartitionJaml(f.partitionInfo) case _ => "" } @@ -282,6 +281,17 @@ class PythonTransformationJob(operationDef: OperationDef, sb.toString } + private[core] def getPartitionJaml(partitionInfo: PartitionInfo): String = { + partitionInfo match { + case PartitionInfo.Default => + "" + case PartitionInfo.Explicit(npp) => + s"\n number_of_partitions: $npp" + case PartitionInfo.PerRecordCount(rpp) => + s"\n records_per_partition: $rpp" + } + } + private[core] def getTemporaryPathForYamlConfig(conf: Config) = { val temporaryDirectoryBase = if (conf.hasPath(TEMPORARY_DIRECTORY_KEY) && conf.getString(TEMPORARY_DIRECTORY_KEY).nonEmpty) { conf.getString(TEMPORARY_DIRECTORY_KEY).stripSuffix("/") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala index 42fc39d14..8455e3aba 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.metastore.model import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.DataFormat._ -import za.co.absa.pramen.api.{CachePolicy, Query} +import za.co.absa.pramen.api.{CachePolicy, PartitionInfo, Query} import za.co.absa.pramen.core.metastore.model.DataFormatParser.{PATH_KEY, TABLE_KEY} class DataFormatSuite extends AnyWordSpec { @@ -33,10 +33,10 @@ class DataFormatSuite extends AnyWordSpec { assert(!format.isTransient) assert(format.isInstanceOf[Parquet]) assert(format.asInstanceOf[Parquet].path == "/a/b/c") - assert(format.asInstanceOf[Parquet].recordsPerPartition.isEmpty) + assert(format.asInstanceOf[Parquet].partitionInfo == PartitionInfo.Default) } - "use 'parquet' when specified explicitly" in { + "use 'parquet' when rpp specified explicitly" in { val conf = ConfigFactory.parseString( """format = parquet |path = /a/b/c @@ -49,10 +49,26 @@ class DataFormatSuite extends AnyWordSpec { assert(!format.isTransient) assert(format.isInstanceOf[Parquet]) assert(format.asInstanceOf[Parquet].path == "/a/b/c") - assert(format.asInstanceOf[Parquet].recordsPerPartition.contains(100)) + assert(format.asInstanceOf[Parquet].partitionInfo == PartitionInfo.PerRecordCount(100L)) } - "use 'delta' when specified explicitly" in { + "use 'parquet' when npp specified explicitly" in { + val conf = ConfigFactory.parseString( + """format = parquet + |path = /a/b/c + |number.of.partitions = 10 + |""".stripMargin) + + val format = DataFormatParser.fromConfig(conf, conf) + + assert(format.name == "parquet") + assert(!format.isTransient) + assert(format.isInstanceOf[Parquet]) + assert(format.asInstanceOf[Parquet].path == "/a/b/c") + assert(format.asInstanceOf[Parquet].partitionInfo == PartitionInfo.Explicit(10)) + } + + "use 'delta' when rpp specified explicitly" in { val conf = ConfigFactory.parseString( """format = delta |path = /a/b/c @@ -66,7 +82,24 @@ class DataFormatSuite extends AnyWordSpec { assert(format.isInstanceOf[Delta]) assert(format.asInstanceOf[Delta].query.isInstanceOf[Query.Path]) assert(format.asInstanceOf[Delta].query.query == "/a/b/c") - assert(format.asInstanceOf[Delta].recordsPerPartition.contains(200)) + assert(format.asInstanceOf[Delta].partitionInfo == PartitionInfo.PerRecordCount(200L)) + } + + "use 'delta' when npp specified explicitly" in { + val conf = ConfigFactory.parseString( + """format = delta + |path = /a/b/c + |number.of.partitions = 10 + |""".stripMargin) + + val format = DataFormatParser.fromConfig(conf, conf) + + assert(format.name == "delta") + assert(!format.isTransient) + assert(format.isInstanceOf[Delta]) + assert(format.asInstanceOf[Delta].query.isInstanceOf[Query.Path]) + assert(format.asInstanceOf[Delta].query.query == "/a/b/c") + assert(format.asInstanceOf[Delta].partitionInfo == PartitionInfo.Explicit(10)) } "use 'raw' when specified explicitly" in { @@ -151,7 +184,7 @@ class DataFormatSuite extends AnyWordSpec { assert(format.isInstanceOf[Delta]) assert(format.asInstanceOf[Delta].query.isInstanceOf[Query.Path]) assert(format.asInstanceOf[Delta].query.query == "/a/b/c") - assert(format.asInstanceOf[Delta].recordsPerPartition.contains(100)) + assert(format.asInstanceOf[Delta].partitionInfo == PartitionInfo.PerRecordCount(100)) } "throw an exception on unknown format" in { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala index 7edf8133e..4992dbbfb 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/HiveConfigSuite.scala @@ -35,7 +35,7 @@ class HiveConfigSuite extends AnyWordSpec { alwaysEscapeColumnNames = false, optimizeExistQuery = true) - val hiveConfig = HiveConfig.fromConfigWithDefaults(conf, defaultConfig, DataFormat.Parquet("dummy", None)) + val hiveConfig = HiveConfig.fromConfigWithDefaults(conf, defaultConfig, DataFormat.Parquet("dummy")) assert(hiveConfig.hiveApi == HiveApi.SparkCatalog) assert(hiveConfig.database.contains("mydb1")) @@ -82,7 +82,7 @@ class HiveConfigSuite extends AnyWordSpec { alwaysEscapeColumnNames = false, optimizeExistQuery = true) - val hiveConfig = HiveConfig.fromConfigWithDefaults(conf, defaultConfig, DataFormat.Parquet("dummy", None)) + val hiveConfig = HiveConfig.fromConfigWithDefaults(conf, defaultConfig, DataFormat.Parquet("dummy")) assert(hiveConfig.hiveApi == HiveApi.SparkCatalog) assert(hiveConfig.database.contains("mydb2")) @@ -109,7 +109,7 @@ class HiveConfigSuite extends AnyWordSpec { alwaysEscapeColumnNames = true, optimizeExistQuery = true) - val hiveConfig = HiveConfig.fromDefaults(defaultConfig, DataFormat.Parquet("dummy", None)) + val hiveConfig = HiveConfig.fromDefaults(defaultConfig, DataFormat.Parquet("dummy")) assert(hiveConfig.hiveApi == HiveApi.Sql) assert(hiveConfig.database.contains("mydb")) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala index a894eac37..1bc78bd12 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory import org.apache.spark.sql.SaveMode import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.DataFormat.Parquet +import za.co.absa.pramen.api.PartitionInfo import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.reader.model.JdbcConfig import za.co.absa.pramen.core.utils.hive.HiveQueryTemplates @@ -247,7 +248,7 @@ class MetaTableSuite extends AnyWordSpec { assert(metaTable.hiveConfig.templates.dropTableTemplate == "drop") assert(metaTable.hiveConfig.ignoreFailures) assert(metaTable.format.name == "parquet") - assert(metaTable.format.asInstanceOf[Parquet].recordsPerPartition.contains(100)) + assert(metaTable.format.asInstanceOf[Parquet].partitionInfo == PartitionInfo.PerRecordCount(100)) assert(metaTable.hiveTable.contains("my_hive_table")) assert(metaTable.hivePath.contains("/d/e/f")) assert(!metaTable.hivePreferAddPartition) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceParquetSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceParquetSuite.scala index 1c321f173..d96d7d957 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceParquetSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceParquetSuite.scala @@ -20,6 +20,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SaveMode} import org.mockito.Mockito.{mock, when} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.PartitionInfo import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceParquet @@ -39,7 +40,7 @@ class MetastorePersistenceParquetSuite extends AnyWordSpec with SparkTestBase wi withTempDirectory("metastore_parquet") { tempDir => val outputPath = new Path(tempDir, "partition=10") - val persistence = new MetastorePersistenceParquet(tempDir, "ignore", "yyyy-MM-dd", "batchid", 0, None, None, Map.empty, Map.empty) + val persistence = new MetastorePersistenceParquet(tempDir, "ignore", "yyyy-MM-dd", "batchid", 0, PartitionInfo.Default, None, Map.empty, Map.empty) persistence.writeAndCleanOnFailure(exampleDf, outputPath.toString, fsUtils, SaveMode.Overwrite) @@ -52,7 +53,7 @@ class MetastorePersistenceParquetSuite extends AnyWordSpec with SparkTestBase wi withTempDirectory("metastore_parquet") { tempDir => val outputPath = new Path(tempDir, "partition=10") - val persistence = new MetastorePersistenceParquet(tempDir, "ignore", "yyyy-MM-dd", "batchid", 0, None, Some(SaveMode.Append), Map.empty, Map.empty) + val persistence = new MetastorePersistenceParquet(tempDir, "ignore", "yyyy-MM-dd", "batchid", 0, PartitionInfo.Default, Some(SaveMode.Append), Map.empty, Map.empty) persistence.writeAndCleanOnFailure(exampleDf, outputPath.toString, fsUtils, SaveMode.Append) persistence.writeAndCleanOnFailure(exampleDf, outputPath.toString, fsUtils, SaveMode.Append) @@ -71,7 +72,7 @@ class MetastorePersistenceParquetSuite extends AnyWordSpec with SparkTestBase wi when(df.write).thenThrow(new RuntimeException("test exception")) - val persistence = new MetastorePersistenceParquet("dummy", "ignore", "yyyy-MM-dd", "batchid", 0, None, None, Map.empty, Map.empty) + val persistence = new MetastorePersistenceParquet("dummy", "ignore", "yyyy-MM-dd", "batchid", 0, PartitionInfo.Default, None, Map.empty, Map.empty) assertThrows[RuntimeException] { persistence.writeAndCleanOnFailure(df, "dummy", fsUtils, SaveMode.Overwrite) @@ -88,7 +89,7 @@ class MetastorePersistenceParquetSuite extends AnyWordSpec with SparkTestBase wi fsUtils.createDirectoryRecursive(outputPath) - val persistence = new MetastorePersistenceParquet("dummy", "ignore", "yyyy-MM-dd", "batchid", 0, None, None, Map.empty, Map.empty) + val persistence = new MetastorePersistenceParquet("dummy", "ignore", "yyyy-MM-dd", "batchid", 0, PartitionInfo.Default, None, Map.empty, Map.empty) val ex = intercept[RuntimeException] { persistence.writeAndCleanOnFailure(df, outputPath.toString, fsUtils, SaveMode.Overwrite) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala index d77da4e0b..8dd2514b6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode} import org.scalatest.Assertion import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.api.{CachePolicy, DataFormat, Query} +import za.co.absa.pramen.api.{CachePolicy, DataFormat, PartitionInfo, Query} import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, MetastorePersistenceDelta, MetastorePersistenceParquet, MetastorePersistenceTransientEager} @@ -358,6 +358,14 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp assert(files.size == 2) } + def testNumberOfPartitions(tempDir: String, mask: String, mtp: MetastorePersistence): Assertion = { + mtp.saveTable(infoDate, getDf, None) + + val files = LocalFsUtils.getListOfFiles(Paths.get(tempDir), mask) + + assert(files.size == 1) + } + def testPathCreation(mtp: MetastorePersistence): Assertion = { val expected = """[ { @@ -445,7 +453,13 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp "support records per partition" in { withTempDirectory("mt_persist") { tempDir => - testRecordsPerPartition(s"$tempDir/parquet/info_date=2021-10-12", "*.parquet", getParquetMtPersistence(tempDir, recordsPerPartition = Some(2))) + testRecordsPerPartition(s"$tempDir/parquet/info_date=2021-10-12", "*.parquet", getParquetMtPersistence(tempDir, partitionInfo = PartitionInfo.PerRecordCount(2))) + } + } + + "support number of partitions" in { + withTempDirectory("mt_persist") { tempDir => + testNumberOfPartitions(s"$tempDir/parquet/info_date=2021-10-12", "*.parquet", getParquetMtPersistence(tempDir, partitionInfo = PartitionInfo.Explicit(1))) } } @@ -592,7 +606,13 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp "supports records per partition" in { withTempDirectory("mt_persist") { tempDir => - testRecordsPerPartition(s"$tempDir/delta/info_date=2021-10-12", "*.parquet", getDeltaMtPersistence(tempDir, recordsPerPartition = Some(2))) + testRecordsPerPartition(s"$tempDir/delta/info_date=2021-10-12", "*.parquet", getDeltaMtPersistence(tempDir, partitionInfo = PartitionInfo.PerRecordCount(2))) + } + } + + "supports number of partitions" in { + withTempDirectory("mt_persist") { tempDir => + testNumberOfPartitions(s"$tempDir/delta/info_date=2021-10-12", "*.parquet", getDeltaMtPersistence(tempDir, partitionInfo = PartitionInfo.Explicit(1))) } } @@ -643,12 +663,12 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp } def getParquetMtPersistence(tempDir: String, - recordsPerPartition: Option[Long] = None, + partitionInfo: PartitionInfo = PartitionInfo.Default, pathSuffix: String = "parquet", saveModeOpt: Option[SaveMode] = None): MetastorePersistence = { val mt = MetaTableFactory.getDummyMetaTable(name = "table1", - format = DataFormat.Parquet(s"$tempDir/$pathSuffix", recordsPerPartition), + format = DataFormat.Parquet(s"$tempDir/$pathSuffix", partitionInfo), infoDateColumn = infoDateColumn, infoDateFormat = infoDateFormat, saveModeOpt = saveModeOpt @@ -659,12 +679,12 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp def getDeltaMtPersistence(tempDir: String, partitionByInfoDate: Boolean = true, - recordsPerPartition: Option[Long] = None, + partitionInfo: PartitionInfo = PartitionInfo.Default, pathSuffix: String = "delta", saveModeOpt: Option[SaveMode] = None, writeOptions: Map[String, String] = Map.empty[String, String]): MetastorePersistence = { val mt = MetaTableFactory.getDummyMetaTable(name = "table1", - format = DataFormat.Delta(Query.Path(s"$tempDir/$pathSuffix"), recordsPerPartition), + format = DataFormat.Delta(Query.Path(s"$tempDir/$pathSuffix"), partitionInfo), partitionByInfoDate = partitionByInfoDate, infoDateColumn = infoDateColumn, infoDateFormat = infoDateFormat, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/MetaTableFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/MetaTableFactory.scala index 9f25bf535..a0b484e72 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/MetaTableFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/MetaTableFactory.scala @@ -25,7 +25,7 @@ import java.time.LocalDate object MetaTableFactory { def getDummyMetaTable(name: String = "dummy", description: String = "description", - format: DataFormat = DataFormat.Parquet("/tmp/dummy", None), + format: DataFormat = DataFormat.Parquet("/tmp/dummy"), infoDateColumn: String = "INFO_DATE", infoDateFormat: String = "yyyy-MM-dd", partitionByInfoDate: Boolean = true, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index 8f920784e..d42246969 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -33,7 +33,7 @@ import java.time.{Instant, LocalDate} class JobSpy(jobName: String = "Dummy Job", outputTableIn: String = "table_out", - outputTableFormat: DataFormat = DataFormat.Parquet("/tmp/dummy", None), + outputTableFormat: DataFormat = DataFormat.Parquet("/tmp/dummy"), hiveTable: Option[String] = None, operationDef: OperationDef = OperationDefFactory.getDummyOperationDef(), preRunCheckFunction: () => JobPreRunResult = () => JobPreRunResult(JobPreRunStatus.Ready, None, Nil, Nil), diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index d9858bf5a..393886d23 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SaveMode} import za.co.absa.pramen.api.offset.DataOffset import za.co.absa.pramen.api.status.TaskRunReason -import za.co.absa.pramen.api.{MetaTableDef, MetaTableRunInfo, MetadataManager, MetastoreReader} +import za.co.absa.pramen.api._ import za.co.absa.pramen.core.metadata.MetadataManagerNull import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode, TrackingTable} import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderIncremental, TableNotConfigured} @@ -33,6 +33,7 @@ import scala.collection.mutable.ListBuffer class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), availableDates: Seq[LocalDate] = Seq(LocalDate.of(2022, 2, 17)), + dataFormat: DataFormat = DataFormat.Parquet("/tmp/dummy"), tableDf: DataFrame = null, tableException: Throwable = null, stats: MetaTableStats = MetaTableStats(Some(0), None, None), @@ -52,7 +53,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), override def getRegisteredTables: Seq[String] = registeredTables override def getRegisteredMetaTables: Seq[MetaTable] = registeredTables - .map(t => MetaTableFactory.getDummyMetaTable(t, readOptions = readOptions, writeOptions = writeOptions)) + .map(t => MetaTableFactory.getDummyMetaTable(t, format = dataFormat, readOptions = readOptions, writeOptions = writeOptions)) override def isTableAvailable(tableName: String, infoDate: LocalDate): Boolean = registeredTables.contains(tableName) && availableDates.contains(infoDate) @@ -63,7 +64,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), isTableAvailable } - override def getTableDef(tableName: String): MetaTable = MetaTableFactory.getDummyMetaTable(name = tableName, trackDays = trackDays) + override def getTableDef(tableName: String): MetaTable = MetaTableFactory.getDummyMetaTable(name = tableName, trackDays = trackDays, format = dataFormat) override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = { if (tableException != null) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/sink/CmdLineSinkSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/sink/CmdLineSinkSuite.scala index b8b2935a9..d41a53ce5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/sink/CmdLineSinkSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/sink/CmdLineSinkSuite.scala @@ -99,7 +99,7 @@ class CmdLineSinkSuite extends AnyWordSpec with SparkTestBase with TempDirFixtur withTempDirectory("cmd_sink") { tempDir => val metastoreReader = mock(classOf[MetastoreReader]) val metatable = MetaTableDefFactory.getDummyMetaTableDef(name = "table1", - format = DataFormat.Parquet(tempDir, None) + format = DataFormat.Parquet(tempDir) ) when(metastoreReader.getTableDef("table1")).thenReturn(metatable) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala index f68fedea5..2a136c63a 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala @@ -21,8 +21,8 @@ import org.apache.spark.sql.DataFrame import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException import org.scalatest.BeforeAndAfterAll import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.api.Reason import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.api.{DataFormat, PartitionInfo, Reason} import za.co.absa.pramen.core.OperationDefFactory import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.exceptions.ProcessFailedException @@ -368,7 +368,6 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -379,7 +378,6 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -413,7 +411,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 + | records_per_partition: 100000 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -424,7 +422,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 + | records_per_partition: 100000 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -433,7 +431,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | writer_options: {} |""".stripMargin - val (job, _, _, _) = getUseCase() + val (job, _, _, _) = getUseCase(partitionInfo = PartitionInfo.PerRecordCount(100000L)) val actual = job.getYamlConfig(infoDate) @@ -457,7 +455,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 + | number_of_partitions: 10 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -468,7 +466,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 + | number_of_partitions: 10 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -478,6 +476,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi |""".stripMargin val (job, _, _, _) = getUseCase( + partitionInfo = PartitionInfo.Explicit(10), sparkConfig = Map[String, String]( "spark.driver.host" -> "127.0.0.1", "spark.executor.instances" -> "1", @@ -507,7 +506,6 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -520,7 +518,6 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi | description: description | format: parquet | path: /tmp/dummy - | records_per_partition: 500000 | info_date_settings: | column: INFO_DATE | format: yyyy-MM-dd @@ -569,6 +566,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi tableException: Throwable = null, stats: MetaTableStats = null, statsException: Throwable = null, + partitionInfo: PartitionInfo = PartitionInfo.Default, sparkConfig: Map[String, String] = Map.empty[String, String], extraOptions: Map[String, String] = Map.empty[String, String], readOptions: Map[String, String] = Map.empty[String, String], @@ -576,6 +574,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi val bk = new SyncBookkeeperMock val metastore = new MetastoreSpy( tableDf = tableDf, + dataFormat = DataFormat.Parquet("/tmp/dummy", partitionInfo), tableException = tableException, stats = stats, statsException = statsException, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala index ca30b32ab..ddac2fb9f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/SourceValidationSuite.scala @@ -142,7 +142,7 @@ class SourceValidationSuite extends AnyWordSpec with BeforeAndAfterAll with Temp val sourceTable = SourceTableFactory.getDummySourceTable() val table1Path = new Path(tempDir, "table1") - val table1Format = DataFormat.Parquet(table1Path.toString, None) + val table1Format = DataFormat.Parquet(table1Path.toString) val metaTable = MetaTableFactory.getDummyMetaTable(name = "table1", format = table1Format) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSuite.scala index 2a3065618..98bb9da08 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSuite.scala @@ -58,7 +58,7 @@ class HiveHelperSuite extends AnyWordSpec with SparkTestBase { val hiveDefaultConfig = HiveDefaultConfig.getNullConfig - val hiveConfig = HiveConfig.fromConfigWithDefaults(conf, hiveDefaultConfig, DataFormat.Parquet("Dummy", None)) + val hiveConfig = HiveConfig.fromConfigWithDefaults(conf, hiveDefaultConfig, DataFormat.Parquet("Dummy")) val hiveHelper = HiveHelper.fromHiveConfig(hiveConfig).asInstanceOf[HiveHelperSql] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/ConversionTransformerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/ConversionTransformerSuite.scala index 74cc41544..2c6aa628f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/ConversionTransformerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/transformers/ConversionTransformerSuite.scala @@ -138,7 +138,7 @@ class ConversionTransformerSuite extends AnyWordSpec with SparkTestBase with Tem val emptyDf = filesDf.filter($"path" === "_") val tableFormat = if (useWrongFormat) { - DataFormat.Parquet(basePath.toString, None) + DataFormat.Parquet(basePath.toString) } else { DataFormat.Raw(basePath.toString) } diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala index c671ec71c..c15ca8032 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala @@ -20,7 +20,6 @@ import com.typesafe.config.ConfigFactory import org.apache.hadoop.fs.Path import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.DataFormat -import za.co.absa.pramen.api.status.TaskDef import za.co.absa.pramen.extras.TaskDefFactory import za.co.absa.pramen.extras.mocks.{SimpleHttpClientSpy, TestPrototypes} import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_SECRET_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY} @@ -36,7 +35,7 @@ class EcsNotificationTargetSuite extends AnyWordSpec { |""".stripMargin ) - private val dataFormat = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path", None) + private val dataFormat = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path") private val metaTableDef = TestPrototypes.metaTableDef.copy(format = dataFormat) "sendNotification" should { diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala index 0207eea9e..06c303e08 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala @@ -36,7 +36,7 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec { |""".stripMargin ) - private val dataFormat = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path", None) + private val dataFormat = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path") private val metaTableDef = TestPrototypes.metaTableDef.copy(format = dataFormat) "sendNotification" should { @@ -47,8 +47,8 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec { override protected def getHttpClient(trustAllSslCerts: Boolean): SimpleHttpClient = httpClient } - val dataFormat2 = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path2", None) - val dataFormat3 = DataFormat.Delta(Query.Table("table2"), None) + val dataFormat2 = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path2") + val dataFormat3 = DataFormat.Delta(Query.Table("table2")) val metaTableDef2 = TestPrototypes.metaTableDef.copy(name = "table2", format = dataFormat2) val metaTableDef3 = TestPrototypes.metaTableDef.copy(name = "table3", format = dataFormat3)