Skip to content

Commit

Permalink
#245 Run Spark operation outside of synchronized() block.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Jan 19, 2024
1 parent 19dde82 commit 10943bb
Showing 1 changed file with 37 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,27 +92,48 @@ object TransientTableManager {
rawDataframes.contains(partition) || cachedDataframes.contains(partition) || persistedLocations.contains(partition)
}

private[core] def getDataForTheDate(tableName: String, infoDate: LocalDate)(implicit spark: SparkSession): DataFrame = synchronized {
private[core] def getDataForTheDate(tableName: String, infoDate: LocalDate)(implicit spark: SparkSession): DataFrame = {
val partition = getMetastorePartition(tableName, infoDate)

if (rawDataframes.contains(partition)) {
log.info(s"Using non-cached dataframe for '$tableName' for '$infoDate'...")
rawDataframes(partition)
} else if (cachedDataframes.contains(partition)) {
log.info(s"Using cached dataframe for '$tableName' for '$infoDate'...")
cachedDataframes(partition)
} else if (persistedLocations.contains(partition)) {
val path = persistedLocations(partition)
log.info(s"Reading persisted transient table from $path...")
spark.read.parquet(path)
} else {
if (schemas.contains(tableName.toLowerCase)) {
val schema = schemas(tableName.toLowerCase)
this.synchronized{
if (rawDataframes.contains(partition)) {
log.info(s"Using non-cached dataframe for '$tableName' for '$infoDate'...")
return rawDataframes(partition)
}
}

this.synchronized{
if (cachedDataframes.contains(partition)) {
log.info(s"Using cached dataframe for '$tableName' for '$infoDate'...")
return cachedDataframes(partition)
}
}

val pathOpt = this.synchronized{
if (persistedLocations.contains(partition)) {
val path = persistedLocations(partition)
log.info(s"Reading persisted transient table from $path...")
Option(path)
} else {
None
}
}

pathOpt match {
case Some(path) => return spark.read.parquet(path)
case None => // nothing to do
}

val schemaOpt = this.synchronized {
schemas.get(tableName.toLowerCase)
}

schemaOpt match {
case Some(schema) =>
val emptyRDD = spark.sparkContext.emptyRDD[Row]
spark.createDataFrame(emptyRDD, schema)
} else {
case None =>
throw new IllegalStateException(s"No data for transient table '$tableName' for '$infoDate'")
}
}
}

Expand Down

0 comments on commit 10943bb

Please sign in to comment.