Skip to content

Commit

Permalink
Refactor validation spec (#153)
Browse files Browse the repository at this point in the history
Signed-off-by: khorshuheng <[email protected]>

Co-authored-by: khorshuheng <[email protected]>
  • Loading branch information
khorshuheng and khorshuheng authored Jun 27, 2022
1 parent 58ebe0f commit 66ace80
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.ingestion

import feast.ingestion.Modes.Modes
import feast.ingestion.validation.Expectation
import org.joda.time.DateTime

object Modes extends Enumeration {
Expand Down Expand Up @@ -124,11 +125,6 @@ case class ExpectationSpec(
expectations: List[Expectation]
)

case class Expectation(
expectationType: String,
kwargs: Map[String, String]
)

case class IngestionJobConfig(
mode: Modes = Modes.Offline,
featureTable: FeatureTable = null,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2022 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.ingestion.validation

import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, lit}
import org.json4s.{CustomSerializer, DefaultFormats, Extraction, Formats, JObject, JValue}

trait Expectation {

def validate: Column
}

case class ExpectColumnValuesToNotBeNull(columnName: String) extends Expectation {
override def validate: Column = col(columnName).isNotNull
}

case class ExpectColumnValuesToBeBetween(
columnName: String,
minValue: Option[Int],
maxValue: Option[Int]
) extends Expectation {
override def validate: Column = {
(minValue, maxValue) match {
case (Some(min), Some(max)) => col(columnName).between(min, max)
case (Some(min), None) => col(columnName).>=(min)
case (None, Some(max)) => col(columnName).<=(max)
case _ => lit(true)
}
}
}

object Expectation {
implicit val format: Formats = DefaultFormats

def extractColumn(kwargs: JValue): String = {
(kwargs \ "column").extract[String]
}

def apply(expectationType: String, kwargs: JValue): Expectation = {
expectationType match {
case "expect_column_values_to_not_be_null" =>
ExpectColumnValuesToNotBeNull(extractColumn(kwargs))
case "expect_column_values_to_be_between" =>
val column = extractColumn(kwargs)
val minValue = (kwargs \ "minValue").toSome.map(_.extract[Int])
val maxValue = (kwargs \ "maxValue").toSome.map(_.extract[Int])
ExpectColumnValuesToBeBetween(column, minValue, maxValue)
}
}
}

object ExpectationCodec
extends CustomSerializer[Expectation](implicit format =>
(
{ case x: JObject =>
val eType: String = (x \ "expectationType").extract[String]
val kwargs: JValue = (x \ "kwargs")
Expectation(eType, kwargs)
},
{ case x: Expectation =>
Extraction.decompose(x)
}
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package feast.ingestion.validation

import feast.ingestion.{FeatureTable, ExpectationSpec, Expectation}
import feast.ingestion.{FeatureTable, ExpectationSpec}
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, lit}

Expand All @@ -35,32 +35,12 @@ class RowValidator(
def timestampPresent: Column =
col(timestampColumn).isNotNull

def expectColumnValuesToBeBetween(expectation: Expectation): Column = {
val minValue: Option[String] = expectation.kwargs.get("minValue")
val maxValue: Option[String] = expectation.kwargs.get("maxValue")

(minValue, maxValue) match {
case (Some(min), Some(max)) => col(expectation.kwargs("column")).between(min, max)
case (Some(min), None) => col(expectation.kwargs("column")).>=(min)
case (None, Some(max)) => col(expectation.kwargs("column")).<=(max)
case _ => lit(true)
}
}

def validate(expectation: Expectation): Column = {
expectation.expectationType match {
case "expect_column_values_to_not_be_null" => col(expectation.kwargs("column")).isNotNull
case "expect_column_values_to_be_between" => expectColumnValuesToBeBetween(expectation)
case _ => lit(true)
}
}

def validationChecks: Column = {

expectationSpec match {
case Some(value) if value.expectations.isEmpty => lit(true)
case Some(value) =>
value.expectations.map(expectation => validate(expectation)).reduce(_.&&(_))
value.expectations.map(_.validate).reduce(_.&&(_))
case None => lit(true)
}
}
Expand Down

0 comments on commit 66ace80

Please sign in to comment.