From 10943bb8c3224ee1d75a964802b01b7e263e6cfb Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 19 Jan 2024 15:22:25 +0100 Subject: [PATCH] #245 Run Spark operation outside of synchronized() block. --- .../peristence/TransientTableManager.scala | 53 +++++++++++++------ 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala index 1f46ad0a5..4a4ea6739 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala @@ -92,27 +92,48 @@ object TransientTableManager { rawDataframes.contains(partition) || cachedDataframes.contains(partition) || persistedLocations.contains(partition) } - private[core] def getDataForTheDate(tableName: String, infoDate: LocalDate)(implicit spark: SparkSession): DataFrame = synchronized { + private[core] def getDataForTheDate(tableName: String, infoDate: LocalDate)(implicit spark: SparkSession): DataFrame = { val partition = getMetastorePartition(tableName, infoDate) - if (rawDataframes.contains(partition)) { - log.info(s"Using non-cached dataframe for '$tableName' for '$infoDate'...") - rawDataframes(partition) - } else if (cachedDataframes.contains(partition)) { - log.info(s"Using cached dataframe for '$tableName' for '$infoDate'...") - cachedDataframes(partition) - } else if (persistedLocations.contains(partition)) { - val path = persistedLocations(partition) - log.info(s"Reading persisted transient table from $path...") - spark.read.parquet(path) - } else { - if (schemas.contains(tableName.toLowerCase)) { - val schema = schemas(tableName.toLowerCase) + this.synchronized{ + if (rawDataframes.contains(partition)) { + log.info(s"Using non-cached dataframe for '$tableName' for '$infoDate'...") + return rawDataframes(partition) + } + } + + this.synchronized{ + if (cachedDataframes.contains(partition)) { + log.info(s"Using cached dataframe for '$tableName' for '$infoDate'...") + return cachedDataframes(partition) + } + } + + val pathOpt = this.synchronized{ + if (persistedLocations.contains(partition)) { + val path = persistedLocations(partition) + log.info(s"Reading persisted transient table from $path...") + Option(path) + } else { + None + } + } + + pathOpt match { + case Some(path) => return spark.read.parquet(path) + case None => // nothing to do + } + + val schemaOpt = this.synchronized { + schemas.get(tableName.toLowerCase) + } + + schemaOpt match { + case Some(schema) => val emptyRDD = spark.sparkContext.emptyRDD[Row] spark.createDataFrame(emptyRDD, schema) - } else { + case None => throw new IllegalStateException(s"No data for transient table '$tableName' for '$infoDate'") - } } }