diff --git a/core/asset/delete_asset_expr_test.go b/core/asset/delete_asset_expr_test.go index 5332acf0..395a583f 100644 --- a/core/asset/delete_asset_expr_test.go +++ b/core/asset/delete_asset_expr_test.go @@ -33,7 +33,7 @@ func TestDeleteAssetExpr_ToQuery(t *testing.T) { exprStr: asset.DeleteAssetExpr{ ExprStr: &esExpr, }, - want: `{"query":{"bool":{"should":[{"term":{"name":"John"}},{"bool":{"must_not":[{"terms":{"service":["test1","test2","test3"]}}]}}]}}}`, + want: `{"query":{"bool":{"should":[{"term":{"name":"John"}},{"bool":{"must_not":[{"terms":{"service.keyword":["test1","test2","test3"]}}]}}]}}}`, wantErr: false, }, { diff --git a/core/asset/service_test.go b/core/asset/service_test.go index a80b3a31..340cde15 100644 --- a/core/asset/service_test.go +++ b/core/asset/service_test.go @@ -448,7 +448,7 @@ func TestService_DeleteAsset(t *testing.T) { func TestService_DeleteAssets(t *testing.T) { dummyRequest := asset.DeleteAssetsRequest{ QueryExpr: `testing < now()`, - DryRun: false, + DryRun: true, } type testCase struct { Description string @@ -478,6 +478,7 @@ func TestService_DeleteAssets(t *testing.T) { ExpectAffectedRows: 11, ExpectErr: nil, }, + // TODO: add case when DryRun = false which regarding goroutine } for _, tc := range testCases { t.Run(tc.Description, func(t *testing.T) { diff --git a/internal/store/elasticsearch/discovery_repository_test.go b/internal/store/elasticsearch/discovery_repository_test.go index 77c57af3..ae492ee7 100644 --- a/internal/store/elasticsearch/discovery_repository_test.go +++ b/internal/store/elasticsearch/discovery_repository_test.go @@ -415,12 +415,13 @@ func TestDiscoveryRepositoryDeleteByQueryExpr(t *testing.T) { }) t.Run("should not return error on success", func(t *testing.T) { + currentTime := time.Now().UTC() ast := asset.Asset{ ID: "delete-id", Type: asset.TypeTable, Service: bigqueryService, URN: "some-urn", - RefreshedAt: time.Now(), + RefreshedAt: ¤tTime, } err = repo.Upsert(ctx, ast) @@ -461,14 +462,14 @@ func TestDiscoveryRepositoryDeleteByQueryExpr(t *testing.T) { Type: asset.TypeTable, Service: bigqueryService, URN: "urn1", - RefreshedAt: currentTime, + RefreshedAt: ¤tTime, } ast2 := asset.Asset{ ID: "id2", Type: asset.TypeTopic, Service: kafkaService, URN: "urn2", - RefreshedAt: currentTime, + RefreshedAt: ¤tTime, } cli, err := esTestServer.NewClient() require.NoError(t, err) diff --git a/internal/store/postgres/asset_repository.go b/internal/store/postgres/asset_repository.go index 85edb5ac..6090b925 100644 --- a/internal/store/postgres/asset_repository.go +++ b/internal/store/postgres/asset_repository.go @@ -614,14 +614,20 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset, return asset.InvalidError{AssetID: assetID} } + currentTime := time.Now() + if newAsset.RefreshedAt != nil { + currentTime = *newAsset.RefreshedAt + } + if len(clog) == 0 { - if newAsset.RefreshedAt != oldAsset.RefreshedAt { - return r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { - return r.updateAsset(ctx, tx, assetID, newAsset) - }) + if newAsset.RefreshedAt == nil || newAsset.RefreshedAt == oldAsset.RefreshedAt { + return nil } - return nil + return r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { + newAsset.RefreshedAt = ¤tTime + return r.updateAsset(ctx, tx, assetID, newAsset) + }) } return r.client.RunWithinTx(ctx, func(tx *sqlx.Tx) error { @@ -632,7 +638,8 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset, } newAsset.Version = newVersion newAsset.ID = oldAsset.ID - newAsset.UpdatedAt = *newAsset.RefreshedAt // current time + newAsset.UpdatedAt = currentTime + newAsset.RefreshedAt = ¤tTime if err := r.updateAsset(ctx, tx, assetID, newAsset); err != nil { return err @@ -661,10 +668,6 @@ func (r *AssetRepository) update(ctx context.Context, assetID string, newAsset, } func (r *AssetRepository) updateAsset(ctx context.Context, tx *sqlx.Tx, assetID string, newAsset *asset.Asset) error { - currentTime := time.Now() - if newAsset.RefreshedAt != nil { - currentTime = *newAsset.RefreshedAt - } query, args, err := sq.Update("assets"). Set("urn", newAsset.URN). Set("type", newAsset.Type). @@ -675,7 +678,7 @@ func (r *AssetRepository) updateAsset(ctx context.Context, tx *sqlx.Tx, assetID Set("url", newAsset.URL). Set("labels", newAsset.Labels). Set("updated_at", newAsset.UpdatedAt). - Set("refreshed_at", currentTime). + Set("refreshed_at", *newAsset.RefreshedAt). Set("updated_by", newAsset.UpdatedBy.ID). Set("version", newAsset.Version). Where(sq.Eq{"id": assetID}). diff --git a/internal/store/postgres/asset_repository_test.go b/internal/store/postgres/asset_repository_test.go index 075fe1ad..25539376 100644 --- a/internal/store/postgres/asset_repository_test.go +++ b/internal/store/postgres/asset_repository_test.go @@ -705,13 +705,15 @@ func (r *AssetRepositoryTestSuite) TestGetByURN() { } func (r *AssetRepositoryTestSuite) TestVersions() { + currentTime := time.Now().UTC() assetURN := uuid.NewString() + "urn-u-2-version" // v0.1 astVersioning := asset.Asset{ - URN: assetURN, - Type: "table", - Service: "bigquery", - UpdatedBy: r.users[1], + URN: assetURN, + Type: "table", + Service: "bigquery", + UpdatedBy: r.users[1], + RefreshedAt: ¤tTime, } id, err := r.repository.Upsert(r.ctx, &astVersioning) @@ -767,6 +769,7 @@ func (r *AssetRepositoryTestSuite) TestVersions() { Labels: map[string]string{"key1": "value1"}, Version: "0.5", UpdatedBy: r.users[1], + RefreshedAt: ¤tTime, } ast, err := r.repository.GetByID(r.ctx, astVersioning.ID) @@ -797,6 +800,7 @@ func (r *AssetRepositoryTestSuite) TestVersions() { Labels: map[string]string{"key1": "value1"}, Version: "0.5", UpdatedBy: r.users[1], + RefreshedAt: ¤tTime, } ast, err := r.repository.GetByVersionWithID(r.ctx, astVersioning.ID, "0.5") @@ -1066,11 +1070,14 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { r.Run("on update", func() { r.Run("should not create nor updating the asset if asset is identical", func() { + currentTime := time.Now().UTC() ast := asset.Asset{ - URN: "urn-u-2", - Type: "table", - Service: "bigquery", - UpdatedBy: r.users[0], + URN: "urn-u-2", + Type: "table", + Service: "bigquery", + UpdatedBy: r.users[0], + RefreshedAt: ¤tTime, + Version: "0.1", } identicalAsset := ast @@ -1085,15 +1092,52 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { identicalAsset.ID = id r.Equal(ast.ID, identicalAsset.ID) + r.Equal(ast.Version, identicalAsset.Version) + }) + + r.Run("should same asset version if asset only has different in RefreshedAt", func() { + currentTime := time.Now().UTC().AddDate(0, 0, -1) + ast := asset.Asset{ + URN: "urn-u-2", + Type: "table", + Service: "bigquery", + URL: "https://sample-url-old.com", + UpdatedBy: r.users[0], + RefreshedAt: ¤tTime, + Version: "0.1", + } + + id, err := r.repository.Upsert(r.ctx, &ast) + r.Require().NoError(err) + r.NotEmpty(id) + ast.ID = id + + updated := ast + currentTime2 := time.Now().UTC() + updated.RefreshedAt = ¤tTime2 + + id, err = r.repository.Upsert(r.ctx, &updated) + r.Require().NoError(err) + r.NotEmpty(id) + updated.ID = id + + r.Equal(ast.ID, updated.ID) + + actual, err := r.repository.GetByID(r.ctx, ast.ID) + r.NoError(err) + + r.Equal(updated.RefreshedAt, actual.RefreshedAt) + r.Equal(ast.Version, actual.Version) }) - r.Run("should update the asset if asset is not identical", func() { + r.Run("should update the asset version if asset is not identical", func() { ast := asset.Asset{ URN: "urn-u-2", Type: "table", Service: "bigquery", URL: "https://sample-url-old.com", UpdatedBy: r.users[0], + Version: "0.1", } id, err := r.repository.Upsert(r.ctx, &ast) @@ -1115,6 +1159,7 @@ func (r *AssetRepositoryTestSuite) TestUpsert() { r.NoError(err) r.Equal(updated.URL, actual.URL) + r.NotEqual(ast.Version, actual.Version) }) r.Run("should delete old owners if it does not exist on new asset", func() { diff --git a/internal/workermanager/discovery_worker.go b/internal/workermanager/discovery_worker.go index 10b1875e..e3109cc4 100644 --- a/internal/workermanager/discovery_worker.go +++ b/internal/workermanager/discovery_worker.go @@ -4,9 +4,10 @@ import ( "context" "encoding/json" "fmt" + "strings" + "github.com/goto/compass/core/asset" "github.com/goto/compass/pkg/worker" - "strings" ) //go:generate mockery --name=DiscoveryRepository -r --case underscore --with-expecter --structname DiscoveryRepository --filename discovery_repository_mock.go --output=./mocks diff --git a/pkg/queryexpr/es_expr_test.go b/pkg/queryexpr/es_expr_test.go index e4e83890..7ea0b795 100644 --- a/pkg/queryexpr/es_expr_test.go +++ b/pkg/queryexpr/es_expr_test.go @@ -51,13 +51,13 @@ func TestESExpr_ToQuery(t *testing.T) { { name: "in condition", expr: queryexpr.ESExpr(`service in ["test1","test2","test3"]`), - want: `{"query":{"terms":{"service":["test1","test2","test3"]}}}`, + want: `{"query":{"terms":{"service.keyword":["test1","test2","test3"]}}}`, wantErr: false, }, { name: "equals or not in condition", expr: queryexpr.ESExpr(`name == "John" || service not in ["test1","test2","test3"]`), - want: `{"query":{"bool":{"should":[{"term":{"name":"John"}},{"bool":{"must_not":[{"terms":{"service":["test1","test2","test3"]}}]}}]}}}`, + want: `{"query":{"bool":{"should":[{"term":{"name":"John"}},{"bool":{"must_not":[{"terms":{"service.keyword":["test1","test2","test3"]}}]}}]}}}`, wantErr: false, }, {