Skip to content

Commit

Permalink
Don't use spark to get count query.
Browse files Browse the repository at this point in the history
  • Loading branch information
VladimirRybalko committed Aug 13, 2024
1 parent 068a35f commit e68ebfb
Showing 1 changed file with 13 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.utils.{ConfigUtils, JdbcSparkUtils, TimeUtils}
import za.co.absa.pramen.core.utils.{ConfigUtils, JdbcNativeUtils, JdbcSparkUtils, TimeUtils}

import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate}
Expand Down Expand Up @@ -109,8 +109,18 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,

private[core] def getCountForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
val countSql = s"SELECT COUNT(*) AS CNT FROM ($filteredSql)"
getWithRetry[Long](countSql, isDataQuery = false, jdbcRetries, None)(df => BigDecimal(df.collect()(0)(0).toString).toLong)
val countSql = s"SELECT COUNT(*) FROM ($filteredSql)"
var count = 0L

JdbcNativeUtils.withResultSet(jdbcUrlSelector, countSql, jdbcRetries) { rs =>
if (!rs.next())
throw new IllegalStateException(s"No rows returned by the count query: $countSql")
else {
count = rs.getLong(1)
}
}

count
}

private[core] def getDataForTable(tableName: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
Expand Down

0 comments on commit e68ebfb

Please sign in to comment.