Skip to content

Commit

Permalink
Merge/merging release 2.25 into develop ver 3 (#1963)
Browse files Browse the repository at this point in the history
* Update for next development version 2.24.0-SNAPSHOT

* Suppress download noise in license check

* Suppress compiler warning of obsolete Java (#1892)

* 1868 statistics with missing counts and datasets missing proprties (#1873)

* 1868 statistics with missing counts and datasets missing proprties

* 1843 Summary page for properties (#1880)

* 1843 Home page with properties,  side panel with missing counts and summary page for properties with tab containing datasets missing that particular property

* Feature/1603 mapping table filtering general (#1879)

* #1603 serde tests for CR and MT DataFrameFilters
(mongo-bson-based serde tests for CR and MT DataFrameFilters, mongo-bson-based serde tests extended for CR with a blank mappingTableFilter)

* #1909 Increase the limit of columns shown in menas column selection

* 1903 Add validation for complex default values in mapping tables on import

* Project config and management updates (#1908)

Project config and management updates
* poc issue template
* CODEOWNERS update
* developers update
* Badges to README.md

* 1881 HyperConformance enceladus_info_version from payload  (#1896)

1881 HyperConformance enceladus_info_version from payload

* #1887 defaultTimestampTimeZone can be source type specific (#1899)

#1887 defaultTimestampTimeZone can be source type specific
* `DefaultsByFormat` extends the `Defaults` trait, being able to read defaults from configuration files
* `DefaultsByFormat` offers further granularity by first checking the format specific setting only then taking the global one
* Basic `GlobalDefaults` are not configuration dependent anymore
* Standardization now user `DefaultsByFormat` for its defaults, where rawFormat is used for format parameter
* Switched to configuration path to be `enceladus.defaultTimestampTimeZone.default` and `enceladus.defaultTimestampTimeZone.[rawFormat]` respectively
* `defaultTimestampTimeZone` is still supported/read as an obsolete fallback
Co-authored-by: Daniel K <[email protected]>

* #1887 defaultTimestampTimeZone can be source type specific (#1916)

#1887 defaultTimestampTimeZone can be source type specific
* rename of the configuration prefix from `enceladus.` to `standardization.`

* #172 Save original timezone information in metadata file (#1900)

* Upgrade of Atum to 3.6.0
* Writing the default time zones for timestamps and dates into _INFO file

* #1894 `HadoopFsPersistenceFactory` - adding Spline S3 write support (#1912)

* #1894 Spline S3 support via custom persistence factory `HadoopFsPersistenceFactory`.
Co-authored-by: David Benedeki <[email protected]>

* Update versions for release v2.24.0

* Update for next development version 2.25.0-SNAPSHOT

* #1926 Add executor extra java opts to helper scripts

* #1931 Add switch for running kinit in helper scripts

* #1882 Update Cobrix dependency to v.2.3.0

* #1882 Remove explicit "collapse_root" since it is the default since Cobrix 2.3.0

* #1882 Update Cobrix to 2.4.1 and update Cobol test suite for ASCII files.

* #1882 Bump up Cobrix version to 2.4.2.

* #1927 Spline _LINEAGE and Atum _INFO files permission alignment (#1934)

* #1927 - testing setup: set both spline _LINEAGE and atum _INFO to hdfs file permissions 733 -> the result on EMR HDFS was 711 (due to 022 umask there) -> evidence of working

* #1927 - cleanup of test settings of 733 fs permissions

* #1927 Atum final version 3.7.0 used instead of the snapshot (same code)

* #1927 comment change

* #1927 - default 644 FS permissions for both _INFO and _LINEAGE files.

* 1937 limit output file size (#1941)

* 1937 limit output file size

* 1937 limit output file size

* 1937 renamings + constants

* 1937 more conditions

* 1937 rename params

* 1937 feedback + script params

* 1937 more feedback

* 1937 final feedback

* #1951: Windows Helper scripts - add missing features
* `ADDITIONAL_JVM_EXECUTOR_CONF`
* Kerberos configuration
* Trust store configuration
* kinit execution option
* `--min-processing-block-size` & `--max-processing-block-size`
* logo improvement

* * --min-processing-block-size -> --min-processing-partition-size
* --max-processing-block-size -> --max-processing-partition-size

* #1869: SparkJobs working with LoadBalanced Menas (#1935)

* `menas.rest.retryCount` - configuration, how many times an url should be retried if failing with retry-able error implemented
* `menas.rest.availability.setup` - configuration, how the url list should be handled
* _Standardization_, _Conformance_ and _HyperConformance_ changed to provide retry count and availability setup to Dao, read from configuration
* `ConfigReader` enhanced and unified to read configurations more easily and universally
* Mockito upgraded to 1.16.42

Co-authored-by: Daniel K <[email protected]>

* Feature/1863 mapping table filtering (#1929)

* #1863 mapping cr & mt fitler successfully reuses the same fragment (both using the same named model)
 - todo reuse validation, reuse manipulation methods

* #1863 FilterEdit.js allows reusing filterEdit TreeTable logic between mCR and MT editings

* #1863 mCT editing validation enabled (commons from FilterEdit.js)

* #1863 mCT datatype hinting hinting enabled (commons from DataTypeUtils.js)

* #1863 mCR/MT edit dialog default width=950px, some cleanup
* #1863 bugfixes: directly creating MT with filter (fix on accepting the field), UI fix for MT filter model initialization

* #1863 npm audit fix

* #1863 bugfix: adding new mCR (when no edit MCR dialog has been opened yet) did not work - fixed

* #1863 selecting mapping column from MT schema works (for all schema levels) for edit. TODO = Schema type support

 #1863 mCR - schema-based columns suggested for filter, value types filled in silently during submit, too.

* #1863 bugfix: empty MT - schema may be empty

* #1863 bugfix: removing a filter left a null node - cleanup was needed (otherwise view would fail)
logging cleanup

* #1863 select list item now shows valueType as additionalText, cleanup

* #1863 nonEmptyAndNonNullFilled - map->filter bug fixed.

* #1863 typo for null filter

Co-authored-by: David Benedeki <[email protected]>

* Update versions for release v2.25.0

* [merge] build fix

* [merge] npm audit fix

* [merge] npm audit fix

* [merge] buildfix (menas->rest_api packaging fix)

* [merge] review updates

Co-authored-by: David Benedeki <[email protected]>
Co-authored-by: Saša Zejnilović <[email protected]>
Co-authored-by: David Benedeki <[email protected]>
Co-authored-by: Adrian Olosutean <[email protected]>
Co-authored-by: Ruslan Iushchenko <[email protected]>
  • Loading branch information
6 people authored Nov 4, 2021
1 parent facb46a commit 10213a2
Show file tree
Hide file tree
Showing 66 changed files with 6,412 additions and 3,715 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,75 @@ package za.co.absa.enceladus.dao.rest
import org.apache.commons.lang.exception.ExceptionUtils
import org.slf4j.LoggerFactory
import org.springframework.web.client.{ResourceAccessException, RestClientException}
import za.co.absa.enceladus.dao.rest.CrossHostApiCaller.logger
import za.co.absa.enceladus.dao.{DaoException, RetryableException}

import scala.annotation.tailrec
import scala.util.{Failure, Random, Try}

protected object CrossHostApiCaller {
object CrossHostApiCaller {

def apply(apiBaseUrls: List[String]): CrossHostApiCaller = {
new CrossHostApiCaller(apiBaseUrls, Random.nextInt(apiBaseUrls.size))
private val logger = LoggerFactory.getLogger(classOf[CrossHostApiCaller])

final val DefaultUrlsRetryCount: Int = 0

private def createInstance(apiBaseUrls: Seq[String], urlsRetryCount: Int, startWith: Option[Int]): CrossHostApiCaller = {
val maxTryCount: Int = (if (urlsRetryCount < 0) {
logger.warn(s"Urls retry count cannot be negative ($urlsRetryCount). Using default number of retries instead ($DefaultUrlsRetryCount).") //scalastyle:ignore maxLineLength
DefaultUrlsRetryCount
} else {
urlsRetryCount
}) + 1
val currentHostIndex = startWith.getOrElse(Random.nextInt(Math.max(apiBaseUrls.size, 1)))
new CrossHostApiCaller(apiBaseUrls.toVector, maxTryCount, currentHostIndex)
}

def apply(apiBaseUrls: Seq[String], urlsRetryCount: Int = DefaultUrlsRetryCount, startWith: Option[Int] = None): CrossHostApiCaller = {
createInstance(apiBaseUrls, urlsRetryCount, startWith)
}
}

protected class CrossHostApiCaller(apiBaseUrls: List[String], var currentHostIndex: Int) extends ApiCaller {
private val logger = LoggerFactory.getLogger(this.getClass)
protected class CrossHostApiCaller private(apiBaseUrls: Vector[String], maxTryCount: Int, private var currentHostIndex: Int)
extends ApiCaller {

def baseUrlsCount: Int = apiBaseUrls.size

def currentBaseUrl: String = apiBaseUrls(currentHostIndex)

def nextBaseUrl(): String = {
currentHostIndex = (currentHostIndex + 1) % baseUrlsCount
currentBaseUrl
}

private val maxAttempts = apiBaseUrls.size - 1

def call[T](fn: String => T): T = {
def logFailure(error: Throwable, url: String, attemptNumber: Int, nextUrl: Option[String]): Unit = {
val rootCause = ExceptionUtils.getRootCauseMessage(error)
val switching = nextUrl.map(s => s", switching host to $s").getOrElse("")
logger.warn(s"Request failed on host $url (attempt $attemptNumber of $maxTryCount)$switching - $rootCause")
}

def attempt(index: Int, attemptCount: Int = 0): Try[T] = {
currentHostIndex = index
val currentBaseUrl = apiBaseUrls(index)
Try {
fn(currentBaseUrl)
@tailrec
def attempt(url: String, attemptNumber: Int, urlsTried: Int): Try[T] = {
val result =Try {
fn(url)
}.recoverWith {
case e @ (_: ResourceAccessException | _: RestClientException) => Failure(DaoException("Server non-responsive", e))
}.recoverWith {
case e: RetryableException if attemptCount < maxAttempts =>
val nextIndex = (index + 1) % apiBaseUrls.size
val nextBaseUrl = apiBaseUrls(nextIndex)
val rootCause = ExceptionUtils.getRootCauseMessage(e)
logger.warn(s"Request failed on host $currentBaseUrl, switching host to $nextBaseUrl - $rootCause")
attempt(nextIndex, attemptCount + 1)
}
//using match instead of recoverWith to make the function @tailrec
result match {
case Failure(e: RetryableException) if attemptNumber < maxTryCount =>
logFailure(e, url, attemptNumber, None)
attempt(url, attemptNumber + 1, urlsTried)
case Failure(e: RetryableException) if urlsTried < baseUrlsCount =>
val nextUrl = nextBaseUrl()
logFailure(e, url, attemptNumber, Option(nextUrl))
attempt(nextUrl, 1, urlsTried + 1)
case _ => result
}
}

attempt(currentHostIndex).get
attempt(currentBaseUrl,1, 1).get
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object MenasConnectionStringParser {
.replaceAll("/$", "")
.replaceAll("/api$", "")
)
.toSet
.distinct
.toList
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,33 @@
package za.co.absa.enceladus.dao.rest

import za.co.absa.enceladus.dao.auth.MenasCredentials
import za.co.absa.enceladus.dao.rest.RestDaoFactory.AvailabilitySetup.{Fallback, AvailabilitySetup, RoundRobin}

object RestDaoFactory {

object AvailabilitySetup extends Enumeration {
final type AvailabilitySetup = Value

final val RoundRobin = Value("roundrobin")
final val Fallback = Value("fallback")
}

final val DefaultAvailabilitySetup: AvailabilitySetup = RoundRobin

private val restTemplate = RestTemplateSingleton.instance

def getInstance(authCredentials: MenasCredentials, apiBaseUrls: List[String]): MenasRestDAO = {
val apiCaller = CrossHostApiCaller(apiBaseUrls)
def getInstance(authCredentials: MenasCredentials,
apiBaseUrls: List[String],
urlsRetryCount: Option[Int] = None,
menasSetup: AvailabilitySetup = DefaultAvailabilitySetup): MenasRestDAO = {
val startsWith = if (menasSetup == Fallback) {
Option(0)
} else {
None
}
val apiCaller = CrossHostApiCaller(apiBaseUrls, urlsRetryCount.getOrElse(CrossHostApiCaller.DefaultUrlsRetryCount), startsWith)
val authClient = AuthClient(authCredentials, apiCaller)
val restClient = new RestClient(authClient, restTemplate)
new MenasRestDAO(apiCaller, restClient)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock-maker-inline
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package za.co.absa.enceladus.dao.rest

import org.mockito.Mockito
import org.springframework.web.client.ResourceAccessException
import za.co.absa.enceladus.dao.rest.CrossHostApiCaller.DefaultUrlsRetryCount
import za.co.absa.enceladus.dao.{DaoException, UnauthorizedException}

class CrossHostApiCallerSuite extends BaseTestSuite {
Expand All @@ -27,12 +28,23 @@ class CrossHostApiCallerSuite extends BaseTestSuite {
Mockito.reset(restClient)
}

"CrossHostApiCaller" should {
"cycle through urls" in {
val crossHostApiCaller = CrossHostApiCaller(Vector("a", "b", "c", "d"), DefaultUrlsRetryCount, startWith = Some(1))
crossHostApiCaller.nextBaseUrl() should be("c")
crossHostApiCaller.nextBaseUrl() should be("d")
crossHostApiCaller.nextBaseUrl() should be("a")
crossHostApiCaller.nextBaseUrl() should be("b")
crossHostApiCaller.nextBaseUrl() should be("c")
}
}

"CrossHostApiCaller::call" should {
"return the result of the first successful call" when {
"there are no failures" in {
Mockito.when(restClient.sendGet[String]("a")).thenReturn("success")

val result = new CrossHostApiCaller(List("a", "b", "c"), 0).call { str =>
val result = CrossHostApiCaller(Vector("a", "b", "c"), DefaultUrlsRetryCount, startWith = Some(0)).call { str =>
restClient.sendGet[String](str)
}

Expand All @@ -42,16 +54,33 @@ class CrossHostApiCallerSuite extends BaseTestSuite {

"only some calls fail with a retryable exception" in {
Mockito.when(restClient.sendGet[String]("a")).thenThrow(DaoException("Something went wrong A"))
Mockito.when(restClient.sendGet[String]("b")).thenReturn("success")
Mockito.when(restClient.sendGet[String]("b"))
.thenThrow(DaoException("Something went wrong B"))
.thenReturn("success")

val result = new CrossHostApiCaller(List("a", "b", "c"), 0).call { str =>
val result = CrossHostApiCaller(Vector("a", "b", "c"), 2, Some(0)).call { str =>
restClient.sendGet[String](str)
}

result should be("success")
Mockito.verify(restClient, Mockito.times(3)).sendGet[String]("a")
Mockito.verify(restClient, Mockito.times(2)).sendGet[String]("b")
Mockito.verify(restClient, Mockito.never()).sendGet[String]("c")
}

"despite retry count is negative" in {
Mockito.when(restClient.sendGet[String]("a")).thenThrow(DaoException("Something went wrong A"))
Mockito.when(restClient.sendGet[String]("b")).thenThrow(DaoException("Something went wrong B"))
Mockito.when(restClient.sendGet[String]("c")).thenReturn("success")

val result = CrossHostApiCaller(Vector("a", "b", "c"), -2, Some(0)).call { str =>
restClient.sendGet[String](str)
}

result should be("success")
Mockito.verify(restClient, Mockito.times(1)).sendGet[String]("a")
Mockito.verify(restClient, Mockito.times(1)).sendGet[String]("b")
Mockito.verify(restClient, Mockito.never()).sendGet[String]("c")
Mockito.verify(restClient, Mockito.times(1)).sendGet[String]("c")
}
}

Expand All @@ -62,7 +91,7 @@ class CrossHostApiCallerSuite extends BaseTestSuite {
Mockito.when(restClient.sendGet[String]("c")).thenThrow(DaoException("Something went wrong C"))

val exception = intercept[DaoException] {
new CrossHostApiCaller(List("a", "b", "c"), 0).call { str =>
CrossHostApiCaller(Vector("a", "b", "c"), 0, Some(0)).call { str =>
restClient.sendGet[String](str)
}
}
Expand All @@ -73,12 +102,29 @@ class CrossHostApiCallerSuite extends BaseTestSuite {
Mockito.verify(restClient, Mockito.times(1)).sendGet[String]("c")
}

"all calls fail with a retryable exception over multiple attempts" in {
Mockito.when(restClient.sendGet[String]("a")).thenThrow(DaoException("Something went wrong A"))
Mockito.when(restClient.sendGet[String]("b")).thenThrow(DaoException("Something went wrong B"))
Mockito.when(restClient.sendGet[String]("c")).thenThrow(DaoException("Something went wrong C"))

val exception = intercept[DaoException] {
CrossHostApiCaller(Vector("a", "b", "c"), 1, Some(0)).call { str =>
restClient.sendGet[String](str)
}
}

exception.getMessage should be("Something went wrong C")
Mockito.verify(restClient, Mockito.times(2)).sendGet[String]("a")
Mockito.verify(restClient, Mockito.times(2)).sendGet[String]("b")
Mockito.verify(restClient, Mockito.times(2)).sendGet[String]("c")
}

"any call fails with a non-retryable exception" in {
Mockito.when(restClient.sendGet[String]("a")).thenThrow(new ResourceAccessException("Something went wrong A"))
Mockito.when(restClient.sendGet[String]("b")).thenThrow(UnauthorizedException("Wrong credentials"))

val exception = intercept[UnauthorizedException] {
new CrossHostApiCaller(List("a", "b", "c"), 0).call { str =>
CrossHostApiCaller(Vector("a", "b", "c"), 0, Some(0)).call { str =>
restClient.sendGet[String](str)
}
}
Expand All @@ -89,6 +135,17 @@ class CrossHostApiCallerSuite extends BaseTestSuite {
Mockito.verify(restClient, Mockito.never()).sendGet[String]("c")
}
}

"fail on not having Urls" when {
"none are provided" in {
val exception = intercept[IndexOutOfBoundsException] {
CrossHostApiCaller(Vector()).call { str =>
restClient.sendGet[String](str)
}
}
exception.getMessage should be ("0")
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,24 @@ class MenasConnectionStringParserSuite extends BaseTestSuite {
exception.getMessage should be("Malformed Menas connection string")
}
}
}

"keep the order of urls" when {
val expectedList = List(
"http://host1:8080/menas",
"http://host2:9000/menas",
"http://host3:8080/menas",
"http://host4:9000/menas",
"http://localhost:8080/menas",
"http://localhost:8090/menas"
)
"they are full fledged urls separated by semicolon" in {
val result = MenasConnectionStringParser.parse("http://host1:8080/menas;http://host2:9000/menas;http://host3:8080/menas;http://host4:9000/menas;http://localhost:8080/menas;http://localhost:8090/menas")
result should be(expectedList)
}
"varied hosts separated by comma within one url" in {
val result = MenasConnectionStringParser.parse("http://host1:8080,host2:9000,host3:8080,host4:9000,localhost:8080,localhost:8090/menas")
result should be(expectedList)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

package za.co.absa.enceladus.dao.rest

import org.mockito.MockitoSugar.withObjectMocked
import org.mockito.{ArgumentMatchersSugar, Mockito}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.enceladus.dao.UnauthorizedException
import za.co.absa.enceladus.dao.auth.{InvalidMenasCredentials, MenasKerberosCredentials, MenasPlainCredentials}
import za.co.absa.enceladus.dao.rest.RestDaoFactory.AvailabilitySetup

class RestDaoFactorySuite extends AnyWordSpec with Matchers {
class RestDaoFactorySuite extends AnyWordSpec with Matchers with ArgumentMatchersSugar {

private val menasApiBaseUrls = List("http://localhost:8080/menas/api")

Expand All @@ -47,12 +50,50 @@ class RestDaoFactorySuite extends AnyWordSpec with Matchers {
exception.getMessage should be("No Menas credentials provided")
}
}
"properly adjusts the starting URL based on the setup type " when {
val fooCrossHostApiCaller = CrossHostApiCaller(Seq.empty)
val plainCredentials = MenasPlainCredentials("user", "changeme")
"when it's round-robin" in {
withObjectMocked[CrossHostApiCaller.type] {
Mockito.when(CrossHostApiCaller.apply(any[Seq[String]], any[Int], any[Option[Int]])).thenReturn(fooCrossHostApiCaller)
val restDao = RestDaoFactory.getInstance(plainCredentials, menasApiBaseUrls)
getAuthClient(restDao.restClient).getClass should be(classOf[LdapAuthClient])
Mockito.verify(CrossHostApiCaller, Mockito.times(1)).apply(
menasApiBaseUrls,
CrossHostApiCaller.DefaultUrlsRetryCount,
None)
}
}
"when it's fallback" in {
withObjectMocked[CrossHostApiCaller.type] {
Mockito.when(CrossHostApiCaller.apply(any[Seq[String]], any[Int], any[Option[Int]])).thenReturn(fooCrossHostApiCaller)
val plainCredentials = MenasPlainCredentials("user", "changeme")
val restDao = RestDaoFactory.getInstance(plainCredentials, menasApiBaseUrls, None, AvailabilitySetup.Fallback)
getAuthClient(restDao.restClient).getClass should be(classOf[LdapAuthClient])
Mockito.verify(CrossHostApiCaller, Mockito.times(1)).apply(
menasApiBaseUrls,
CrossHostApiCaller.DefaultUrlsRetryCount,
Option(0))
}
}
"when the setup type is not specified" in {
withObjectMocked[CrossHostApiCaller.type] {
Mockito.when(CrossHostApiCaller.apply(any[Seq[String]], any[Int], any[Option[Int]])).thenReturn(fooCrossHostApiCaller)
val restDao = RestDaoFactory.getInstance(plainCredentials, menasApiBaseUrls)
getAuthClient(restDao.restClient).getClass should be(classOf[LdapAuthClient])
Mockito.verify(CrossHostApiCaller, Mockito.times(1)).apply(
menasApiBaseUrls,
CrossHostApiCaller.DefaultUrlsRetryCount,
None)
}
}
}
}

private def getAuthClient(restClient: RestClient): AuthClient = {
val field = classOf[RestClient].getDeclaredField("authClient")
field.setAccessible(true)
field.get(restClient).asInstanceOf[AuthClient]
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class DataFrameFilterSuite extends AnyFunSuite {
assert(filterExpr2.semanticEquals(expected))
}

test("Three filters joined with an and condidion") {
test("Three filters joined with an and condition") {
val f1 = DiffersFilter("column1", "v1")
val f2 = DiffersFilter("column2", "v2")
val f3 = DiffersFilter("column3", "v3")
Expand Down
Loading

0 comments on commit 10213a2

Please sign in to comment.