Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#399 Add ability to use date expressions and formatting in jdbc native sql expressions #401

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1929,6 +1929,41 @@ Here is an example configuration for a JDBC source:
}
```

You can use date expressions and formatted dates in sql expressions. You can wrap date expressions in `@{}` and use
variables like `@infoDate` and date functions referenced below inside curly braces. And you can apply formatting to variables
using `%format%` (like `%yyyy-MM-dd%`) after variables or expressions.
Examples:

For
```hocon
sql = "SELECT * FROM my_table_@infoDate%yyyyMMdd% WHERE a = b"
```
the result would look like:
```sql
SELECT * FROM my_table_20220218 WHERE a = b
```

For
```hocon
sql = "SELECT * FROM my_table WHERE snapshot_date = date'@{beginOfMonth(minusMonths(@infoDate, 1))}'"
```
the result would look like:
```sql
-- the beginning of the previous month
SELECT * FROM my_table WHERE snapshot_date = date'2022-01-01'
```

For
```hocon
sql = "SELECT * FROM my_table_@{plusMonths(@infoDate, 1)}%yyyyMMdd% WHERE a = b"
```
the result would look like:
```sql
SELECT * FROM my_table_20220318 WHERE a = b
-- ^the month is 3 (next month)
```


The above example also shows how you can add a pre-ingestion validation on the number of records in the table
using `minimum.records` parameter.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

private[core] def getCountForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd, infoDateFormatter)
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
getWithRetry[Long](filteredSql, isDataQuery = false, jdbcRetries, None)(df => df.count())
}

Expand All @@ -129,7 +129,7 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

private[core] def getDataForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd, infoDateFormatter)
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
getWithRetry[DataFrame](filteredSql, isDataQuery = true, jdbcRetries, None)(df => filterDfColumns(df, columns))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.utils.{JdbcNativeUtils, TimeUtils}
import za.co.absa.pramen.core.utils.{JdbcNativeUtils, StringUtils, TimeUtils}

import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate}
Expand All @@ -45,7 +45,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val start = Instant.now()
val sql = getFilteredSql(getSqlExpression(query), infoDateBegin, infoDateEnd, infoDateFormatter)
val sql = getFilteredSql(getSqlExpression(query), infoDateBegin, infoDateEnd)
log.info(s"JDBC Native count of: $sql")
val count = JdbcNativeUtils.getJdbcNativeRecordCount(jdbcConfig, url, sql)
val finish = Instant.now()
Expand All @@ -57,7 +57,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
log.info(s"JDBC Native data of: $query")
query match {
case Query.Sql(sql) => getDataFrame(getFilteredSql(sql, infoDateBegin, infoDateEnd, infoDateFormatter))
case Query.Sql(sql) => getDataFrame(getFilteredSql(sql, infoDateBegin, infoDateEnd))
case Query.Table(table) => getDataFrame(getSqlDataQuery(table, infoDateBegin, infoDateEnd, columns))
case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the JDBC Native reader. Use 'sql' or 'table' instead.")
}
Expand Down Expand Up @@ -101,13 +101,12 @@ object TableReaderJdbcNative {
new TableReaderJdbcNative(tableReaderJdbc, urlSelector, conf)
}

def getFilteredSql(sqlExpression: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, formatter: DateTimeFormatter): String = {
sqlExpression
.replaceAll("@dateFrom", formatter.format(infoDateBegin))
.replaceAll("@dateTo", formatter.format(infoDateEnd))
.replaceAll("@date", formatter.format(infoDateEnd))
.replaceAll("@infoDateBegin", formatter.format(infoDateBegin))
.replaceAll("@infoDateEnd", formatter.format(infoDateEnd))
.replaceAll("@infoDate", formatter.format(infoDateEnd))
def getFilteredSql(sqlExpression: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String = {
val f1 = StringUtils.replaceFormattedDateExpression(sqlExpression, "dateFrom", infoDateBegin)
val f2 = StringUtils.replaceFormattedDateExpression(f1, "dateTo", infoDateEnd)
val f3 = StringUtils.replaceFormattedDateExpression(f2, "date", infoDateEnd)
val f4 = StringUtils.replaceFormattedDateExpression(f3, "infoDateBegin", infoDateBegin)
val f5 = StringUtils.replaceFormattedDateExpression(f4, "infoDateEnd", infoDateEnd)
StringUtils.replaceFormattedDateExpression(f5, "infoDate", infoDateEnd)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ object SparkUtils {

def applyFilters(df: DataFrame, filters: Seq[String], infoDate: LocalDate, dateFrom: LocalDate, dateTo: LocalDate): DataFrame = {
filters.foldLeft(df)((df, filter) => {
val actualFilter = filter
.replaceAll("@dateFrom", s"${dateFrom.toString}")
.replaceAll("@dateTo", s"${dateTo.toString}")
.replaceAll("@date", s"${infoDate.toString}")
.replaceAll("@infoDate", s"date'${infoDate.toString}'")
val f1 = StringUtils.replaceFormattedDateExpression(filter, "dateFrom", dateFrom)
val f2 = StringUtils.replaceFormattedDateExpression(f1, "dateTo", dateTo)
val f3 = StringUtils.replaceFormattedDateExpression(f2, "date", infoDate)
val actualFilter = f3.replaceAll("@infoDate", s"date'${infoDate.toString}'")

log.info(s"Applying filter: $actualFilter")
df.filter(expr(actualFilter))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package za.co.absa.pramen.core.utils

import za.co.absa.pramen.core.exceptions.{OsSignalException, ThreadStackTrace}
import za.co.absa.pramen.core.exceptions.ThreadStackTrace
import za.co.absa.pramen.core.expr.DateExprEvaluator

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.{Base64, StringTokenizer}
import scala.compat.Platform.EOL
import scala.util.control.NonFatal
Expand Down Expand Up @@ -193,4 +195,131 @@ object StringUtils {
base + details
}

/**
* Replaces a template with date substitution.
*
* For example, given
* {{{
* SELECT * FROM my_table_@date%yyyyMMdd% WHERE a = b
* }}}
* and date is '2022-02-18' the result is:
* {{{
* SELECT * FROM my_table_20220218 WHERE a = b
* }}}
*
* and with date substitution:
* {{{
* SELECT * FROM my_table_@{plusMonths(@date, 1)}%yyyyMMdd% WHERE a = b
* }}}
* the result is
* {{{
* SELECT * FROM my_table_20220318 WHERE a = b
* }}}
*
*
* @param template A template to replace variablesin.
* @param dateVar A variable name for the date (does not include '@') - case sensitive.
* @param date The date to replace the the variable with.
* @return The processed template.
*/
def replaceFormattedDateExpression(template: String, dateVar: String, date: LocalDate): String = {
val output = new StringBuilder()
val outputPartial = new StringBuilder()
val outputExpression = new StringBuilder()
var state = 0
var i = 0
var j = 0

val STATE_TEMPLATE_AS_IS = 0
val CATCH_VARIABLE = 1
val END_OF_VARIABLE = 2
val END_OF_FORMAT = 3
val DATE_EXPRESSION = 4

val expr = new DateExprEvaluator
expr.setValue(dateVar, date)

while (i < template.length) {
val c = template(i)
state match {
case STATE_TEMPLATE_AS_IS =>
if (c == '@') {
outputExpression.clear()
outputPartial.clear()
if (i < template.length - 2 && template(i + 1) == '{') {
i += 1
state = DATE_EXPRESSION
} else {
state = CATCH_VARIABLE
j = 0
outputPartial.append(s"$c")
}
} else {
output.append(s"$c")
}
case CATCH_VARIABLE =>
outputPartial.append(s"$c")
if (c == dateVar(j)) {
j += 1
if (j == dateVar.length) {
state = END_OF_VARIABLE
if (i == template.length - 1) {
output.append(s"$date")
}
}
} else {
output.append(s"${outputPartial.toString()}")
outputPartial.clear()
state = STATE_TEMPLATE_AS_IS
}
case END_OF_VARIABLE =>
if (c == '%') {
state = END_OF_FORMAT
outputPartial.clear()
} else {
if (outputExpression.nonEmpty) {
val calculatedDate = expr.evalDate(outputExpression.toString())
output.append(s"$calculatedDate$c")
} else {
output.append(s"$date$c")
}
state = STATE_TEMPLATE_AS_IS
}
case END_OF_FORMAT =>
if (c == '%') {
state = STATE_TEMPLATE_AS_IS
if (outputExpression.nonEmpty) {
val calculatedDate = expr.evalDate(outputExpression.toString())
val formatter = DateTimeFormatter.ofPattern(outputPartial.toString())
output.append(s"${formatter.format(calculatedDate)}")
} else {
val formatter = DateTimeFormatter.ofPattern(outputPartial.toString())
output.append(s"${formatter.format(date)}")
}
} else {
outputPartial.append(s"$c")
}
case DATE_EXPRESSION =>
if (c == '}') {
state = END_OF_VARIABLE
if (i == template.length - 1) {
val calculatedDate = expr.evalDate(outputExpression.toString())
output.append(s"$calculatedDate")
}
} else {
outputExpression.append(s"$c")
}

}
i += 1
}
if (state == DATE_EXPRESSION) {
throw new IllegalArgumentException(s"No matching '{' in the date expression: $template")
}
if (state == END_OF_FORMAT) {
throw new IllegalArgumentException(s"No matching '%' in the formatted date expression: $template")
}
output.toString()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,124 @@ class StringUtilsSuite extends AnyWordSpec {
}
}

"replaceFormattedDateExpression" should {
val infoDate = LocalDate.of(2022, 2, 18)

"work with normal variables" in {
val template = "SELECT @dat FROM my_table_@date + 1"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "SELECT @dat FROM my_table_2022-02-18 + 1")
}

"work with variables at the end" in {
val template = "SELECT @dat FROM my_table_@date"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "SELECT @dat FROM my_table_2022-02-18")
}

"work with just variables" in {
val template = "@date"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "2022-02-18")
}

"work with 2 variables" in {
val template = "@date @date"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "2022-02-18 2022-02-18")
}

"work with formatted variables" in {
val template = "SELECT * FROM my_table_@date%yyyyMMdd% WHERE a = b"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "SELECT * FROM my_table_20220218 WHERE a = b")
}

"work with just formatted variables" in {
val template = "@date%yyyyMMdd%"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "20220218")
}

"work with 2 formatted variables" in {
val template = "@date%yyyyMMdd%@date%ddMMyyy%"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "2022021818022022")
}

"work with partial formatter" in {
val template = "@date%yyyyMM%"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "202202")
}

"work with expressions" in {
val template = "my_table_@{@date + 1}"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "my_table_2022-02-19")
}

"work with formatted expressions" in {
val template = "my_table_@{@date + 1}%yyyyMM%"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "my_table_202202")
}

"work with formatted expressions 2" in {
val template = "SELECT * FROM my_table_@{plusMonths(@date, 1)}%yyyyMMdd% WHERE a = b"

val replaced = replaceFormattedDateExpression(template, "date", infoDate)

assert(replaced == "SELECT * FROM my_table_20220318 WHERE a = b")
}

"work with formatted expressions 3" in {
val template = "SELECT * FROM my_table WHERE snapshot_date = date'@{beginOfMonth(minusMonths(@infoDate, 1))}'"

val replaced = replaceFormattedDateExpression(template, "infoDate", infoDate)

assert(replaced == "SELECT * FROM my_table WHERE snapshot_date = date'2022-01-01'")
}

"throw an exception if format is incomplete" in {
val template = "SELECT * FROM my_table WHERE snapshot_date = date'@infoDate%yyyy-mm-dd'"

val ex = intercept[IllegalArgumentException] {
replaceFormattedDateExpression(template, "infoDate", infoDate)
}

assert(ex.getMessage.contains("No matching '%' in the formatted date expression: SELECT * FROM my_table WHERE snapshot_date = date'@infoDate%yyyy-mm-dd'"))
}

"throw an exception if the expression is incomplete" in {
val template = "SELECT * FROM my_table WHERE snapshot_date = date'@{beginOfMonth(minusMonths(@infoDate, 1))'"

val ex = intercept[IllegalArgumentException] {
replaceFormattedDateExpression(template, "infoDate", infoDate)
}

assert(ex.getMessage.contains("No matching '{' in the date expression: SELECT * FROM my_table WHERE snapshot_date = date'@{beginOfMonth(minusMonths(@infoDate, 1))'"))
}
}

}
Loading