Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bq load using folder #5445

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 34 additions & 35 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
tableNameLimit = 127
)

// maps datatype stored in rudder to datatype in bigquery
// dataTypesMap maps datatype stored in rudder to datatype in bigquery
var dataTypesMap = map[string]bigquery.FieldType{
"boolean": bigquery.BooleanFieldType,
"int": bigquery.IntegerFieldType,
Expand All @@ -70,7 +70,7 @@
"datetime": bigquery.TimestampFieldType,
}

// maps datatype in bigquery to datatype stored in rudder
// dataTypesMapToRudder maps datatype in bigquery to datatype stored in rudder
var dataTypesMapToRudder = map[bigquery.FieldType]string{
"BOOLEAN": "boolean",
"BOOL": "boolean",
Expand Down Expand Up @@ -140,10 +140,8 @@
})
}

func (bq *BigQuery) DeleteTable(ctx context.Context, tableName string) (err error) {
tableRef := bq.db.Dataset(bq.namespace).Table(tableName)
err = tableRef.Delete(ctx)
return
func (bq *BigQuery) DeleteTable(ctx context.Context, tableName string) error {
return bq.db.Dataset(bq.namespace).Table(tableName).Delete(ctx)
}

// CreateTable creates a table in BigQuery with the provided schema
Expand Down Expand Up @@ -218,13 +216,6 @@
return nil
}

func (bq *BigQuery) DropTable(ctx context.Context, tableName string) error {
if err := bq.DeleteTable(ctx, tableName); err != nil {
return err
}
return bq.DeleteTable(ctx, tableName+"_view")
}

// createTableView creates a view for the table to deduplicate the data
// If custom partition is enabled, it creates a view with the partition column and type. Otherwise, it creates a view with ingestion-time partitioning
func (bq *BigQuery) createTableView(ctx context.Context, tableName string, columnMap model.TableSchema) error {
Expand Down Expand Up @@ -303,6 +294,13 @@
return bq.db.Dataset(bq.namespace).Table(tableName+"_view").Create(ctx, metaData)
}

func (bq *BigQuery) DropTable(ctx context.Context, tableName string) error {
if err := bq.DeleteTable(ctx, tableName); err != nil {
return err
}

Check warning on line 300 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L299-L300

Added lines #L299 - L300 were not covered by tests
return bq.DeleteTable(ctx, tableName+"_view")
}

func (bq *BigQuery) schemaExists(ctx context.Context, _, _ string) (exists bool, err error) {
ds := bq.db.Dataset(bq.namespace)
_, err = ds.Metadata(ctx)
Expand Down Expand Up @@ -443,41 +441,41 @@
)
log.Infon("started loading")

loadFileLocations, err := bq.loadFileLocations(ctx, tableName)
loadFileLocation, err := bq.loadFileLocation(ctx, tableName)
if err != nil {
return nil, nil, fmt.Errorf("getting load file locations: %w", err)
return nil, nil, fmt.Errorf("getting load file location: %w", err)

Check warning on line 446 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L446

Added line #L446 was not covered by tests
}

gcsRef := bigquery.NewGCSReference(warehouseutils.GetGCSLocations(
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
loadFileLocations,
warehouseutils.GCSLocationOptions{},
)...)
gcsRef := bigquery.NewGCSReference(loadFileLocation)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false

return bq.loadTableByAppend(ctx, tableName, gcsRef, log)
}

func (bq *BigQuery) loadFileLocations(
func (bq *BigQuery) loadFileLocation(
ctx context.Context,
tableName string,
) ([]warehouseutils.LoadFile, error) {
) (string, error) {
switch tableName {
case warehouseutils.IdentityMappingsTable, warehouseutils.IdentityMergeRulesTable:
loadfile, err := bq.uploader.GetSingleLoadFile(
ctx,
tableName,
)
if err != nil {
return nil, fmt.Errorf("getting single load file for table %s: %w", tableName, err)
return "", fmt.Errorf("getting single load file for table %s: %w", tableName, err)

Check warning on line 468 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L468

Added line #L468 was not covered by tests
}
return []warehouseutils.LoadFile{loadfile}, nil
return loadfile.Location, nil
shekhar-rudder marked this conversation as resolved.
Show resolved Hide resolved
default:
return bq.uploader.GetLoadFilesMetadata(
ctx,
warehouseutils.GetLoadFilesOptions{Table: tableName},
)
objectLocation, err := bq.uploader.GetSampleLoadFileLocation(ctx, tableName)
if err != nil {
return "", fmt.Errorf("getting sample load file location for table %s: %w", tableName, err)
}

Check warning on line 475 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L474-L475

Added lines #L474 - L475 were not covered by tests
gcsLocation := warehouseutils.GetGCSLocation(objectLocation, warehouseutils.GCSLocationOptions{})

return loadFolder(gcsLocation), nil
}
}

Expand Down Expand Up @@ -692,15 +690,12 @@
}

func (bq *BigQuery) createAndLoadStagingUsersTable(ctx context.Context, stagingTable string) error {
loadFileLocations, err := bq.loadFileLocations(ctx, warehouseutils.UsersTable)
loadFileLocation, err := bq.loadFileLocation(ctx, warehouseutils.UsersTable)
if err != nil {
return fmt.Errorf("getting load file locations: %w", err)
return fmt.Errorf("getting load file location: %w", err)

Check warning on line 695 in warehouse/integrations/bigquery/bigquery.go

View check run for this annotation

Codecov / codecov/patch

warehouse/integrations/bigquery/bigquery.go#L695

Added line #L695 was not covered by tests
}

gcsRef := bigquery.NewGCSReference(warehouseutils.GetGCSLocations(
loadFileLocations,
warehouseutils.GCSLocationOptions{},
)...)
gcsRef := bigquery.NewGCSReference(loadFileLocation)
gcsRef.SourceFormat = bigquery.JSON
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false
Expand Down Expand Up @@ -1178,9 +1173,9 @@
}

func (bq *BigQuery) LoadTestTable(ctx context.Context, location, tableName string, _ map[string]interface{}, _ string) error {
gcsLocations := warehouseutils.GetGCSLocation(location, warehouseutils.GCSLocationOptions{})
gcsLocation := warehouseutils.GetGCSLocation(location, warehouseutils.GCSLocationOptions{})

gcsRef := bigquery.NewGCSReference([]string{gcsLocations}...)
gcsRef := bigquery.NewGCSReference(loadFolder(gcsLocation))
gcsRef.SourceFormat = bigquery.JSON
gcsRef.MaxBadRecords = 0
gcsRef.IgnoreUnknownValues = false
Expand Down Expand Up @@ -1213,6 +1208,10 @@
return nil
}

func loadFolder(objectLocation string) string {
return warehouseutils.GetLocationFolder(objectLocation) + "/*"
}

func (*BigQuery) SetConnectionTimeout(_ time.Duration) {
}

Expand Down
68 changes: 68 additions & 0 deletions warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"context"
"errors"
"fmt"
"io"
"os"
"slices"
"testing"
"time"

"cloud.google.com/go/bigquery"
"github.com/google/uuid"
"github.com/samber/lo"
"go.uber.org/mock/gomock"
"google.golang.org/api/iterator"
Expand Down Expand Up @@ -1095,6 +1097,71 @@ func TestIntegration(t *testing.T) {
)
require.Equal(t, records, whth.SampleTestRecords())
})
t.Run("multiple files", func(t *testing.T) {
tableName := "multiple_files_test_table"
repeat := 10
loadObjectFolder := "rudder-warehouse-load-objects"
sourceID := "test_source_id"

prefixes := []string{loadObjectFolder, tableName, sourceID, uuid.New().String() + "-" + tableName}

loadFiles := lo.RepeatBy(repeat, func(int) whutils.LoadFile {
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
sourceFile, err := os.Open("../testdata/load.json.gz")
require.NoError(t, err)
defer func() { _ = sourceFile.Close() }()

tempFile, err := os.CreateTemp("", "clone_*.json.gz")
require.NoError(t, err)
defer func() { _ = tempFile.Close() }()

_, err = io.Copy(tempFile, sourceFile)
require.NoError(t, err)

f, err := os.Open(tempFile.Name())
require.NoError(t, err)
defer func() { _ = f.Close() }()

uploadOutput, err := fm.Upload(context.Background(), f, prefixes...)
require.NoError(t, err)
return whutils.LoadFile{Location: uploadOutput.Location}
})
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)

bq := whbigquery.New(config.New(), logger.NOP)
require.NoError(t, bq.Setup(ctx, warehouse, mockUploader))
require.NoError(t, bq.CreateSchema(ctx))
require.NoError(t, bq.CreateTable(ctx, tableName, schemaInWarehouse))

loadTableStat, err := bq.LoadTable(ctx, tableName)
require.NoError(t, err)
require.Equal(t, loadTableStat.RowsInserted, int64(repeat*14))
require.Equal(t, loadTableStat.RowsUpdated, int64(0))

records := bqhelper.RetrieveRecordsFromWarehouse(t, db,
fmt.Sprintf(`
SELECT
id,
received_at,
test_bool,
test_datetime,
test_float,
test_int,
test_string
FROM %s.%s
WHERE _PARTITIONTIME BETWEEN TIMESTAMP('%s') AND TIMESTAMP('%s')
ORDER BY id;`,
namespace,
tableName,
time.Now().Add(-24*time.Hour).Format("2006-01-02"),
time.Now().Add(+24*time.Hour).Format("2006-01-02"),
),
)
expectedRecords := make([][]string, 0, repeat)
for i := 0; i < repeat; i++ {
expectedRecords = append(expectedRecords, whth.SampleTestRecords()...)
}
require.ElementsMatch(t, expectedRecords, records)
})
})

t.Run("Fetch schema", func(t *testing.T) {
Expand Down Expand Up @@ -1470,6 +1537,7 @@ func newMockUploader(
).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInUpload(tableName).Return(schemaInUpload).AnyTimes()
mockUploader.EXPECT().GetTableSchemaInWarehouse(tableName).Return(schemaInWarehouse).AnyTimes()
mockUploader.EXPECT().GetSampleLoadFileLocation(gomock.Any(), tableName).Return(loadFiles[0].Location, nil).AnyTimes()

return mockUploader
}
32 changes: 6 additions & 26 deletions warehouse/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ const (
const (
WAREHOUSE = "warehouse"
RudderMissingDatatype = "warehouse_rudder_missing_datatype"
MissingDatatype = "<missing_datatype>"
)

const (
Expand Down Expand Up @@ -290,8 +289,7 @@ func GetObjectFolderForDeltalake(provider, location string) (folder string) {
blobUrlParts := azblob.NewBlobURLParts(*blobUrl)
accountName := strings.Replace(blobUrlParts.Host, ".blob.core.windows.net", "", 1)
blobLocation := fmt.Sprintf("wasbs://%s@%s.blob.core.windows.net/%s", blobUrlParts.ContainerName, accountName, blobUrlParts.BlobName)
lastPos := strings.LastIndex(blobLocation, "/")
folder = blobLocation[:lastPos]
folder = GetLocationFolder(blobLocation)
}
return
}
Expand Down Expand Up @@ -389,8 +387,7 @@ func GetS3Location(location string) (s3Location, region string) {
// https://test-bucket.s3.amazonaws.com/myfolder/test-object.csv --> s3://test-bucket/myfolder
func GetS3LocationFolder(location string) string {
s3Location, _ := GetS3Location(location)
lastPos := strings.LastIndex(s3Location, "/")
return s3Location[:lastPos]
return GetLocationFolder(s3Location)
}

type GCSLocationOptions struct {
Expand All @@ -413,16 +410,11 @@ func GetGCSLocation(location string, options GCSLocationOptions) string {
// GetGCSLocationFolder returns the folder path for a gcs object
// https://storage.googleapis.com/test-bucket/myfolder/test-object.csv --> gcs://test-bucket/myfolder
func GetGCSLocationFolder(location string, options GCSLocationOptions) string {
s3Location := GetGCSLocation(location, options)
lastPos := strings.LastIndex(s3Location, "/")
return s3Location[:lastPos]
return GetLocationFolder(GetGCSLocation(location, options))
}

func GetGCSLocations(loadFiles []LoadFile, options GCSLocationOptions) (gcsLocations []string) {
for _, loadFile := range loadFiles {
gcsLocations = append(gcsLocations, GetGCSLocation(loadFile.Location, options))
}
return
func GetLocationFolder(location string) string {
return location[:strings.LastIndex(location, "/")]
}

// GetAzureBlobLocation parses path-style location http url to return in azure:// format
Expand All @@ -435,9 +427,7 @@ func GetAzureBlobLocation(location string) string {
// GetAzureBlobLocationFolder returns the folder path for an azure storage object
// https://myproject.blob.core.windows.net/test-bucket/myfolder/test-object.csv --> azure://myproject.blob.core.windows.net/myfolder
func GetAzureBlobLocationFolder(location string) string {
s3Location := GetAzureBlobLocation(location)
lastPos := strings.LastIndex(s3Location, "/")
return s3Location[:lastPos]
return GetLocationFolder(GetAzureBlobLocation(location))
}

func GetS3Locations(loadFiles []LoadFile) []LoadFile {
Expand Down Expand Up @@ -819,16 +809,6 @@ func GetLoadFileFormat(loadFileType string) string {
}
}

func GetDateRangeList(start, end time.Time, dateFormat string) (dateRange []string) {
if (start == time.Time{} || end == time.Time{}) {
return
}
for d := start; !d.After(end); d = d.AddDate(0, 0, 1) {
dateRange = append(dateRange, d.Format(dateFormat))
}
return
}

func StagingTablePrefix(provider string) string {
return ToProviderCase(provider, stagingTablePrefix)
}
Expand Down
26 changes: 0 additions & 26 deletions warehouse/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,24 +405,6 @@ func TestGetGCSLocationFolder(t *testing.T) {
}
}

func TestGetGCSLocations(t *testing.T) {
inputs := []LoadFile{
{Location: "https://storage.googleapis.com/test-bucket/test-object.csv"},
{Location: "https://storage.googleapis.com/my.test-bucket/test-object.csv"},
{Location: "https://storage.googleapis.com/my.test-bucket2/test-object.csv"},
{Location: "https://storage.googleapis.com/my.test-bucket/test-object2.csv"},
}
outputs := []string{
"gs://test-bucket/test-object.csv",
"gs://my.test-bucket/test-object.csv",
"gs://my.test-bucket2/test-object.csv",
"gs://my.test-bucket/test-object2.csv",
}

gcsLocations := GetGCSLocations(inputs, GCSLocationOptions{})
require.Equal(t, gcsLocations, outputs)
}

func TestGetAzureBlobLocation(t *testing.T) {
inputs := []struct {
location string
Expand Down Expand Up @@ -1193,14 +1175,6 @@ var _ = Describe("Utils", func() {
Entry(nil, json.RawMessage(`{"k1": { "k2": "v2" }}`), model.Schema{"k1": {"k2": "v2"}}),
)

DescribeTable("Get date range list", func(start, end time.Time, format string, expected []string) {
Expect(GetDateRangeList(start, end, format)).To(Equal(expected))
},
Entry("Same day", time.Now(), time.Now(), "2006-01-02", []string{time.Now().Format("2006-01-02")}),
Entry("Multiple days", time.Now(), time.Now().AddDate(0, 0, 1), "2006-01-02", []string{time.Now().Format("2006-01-02"), time.Now().AddDate(0, 0, 1).Format("2006-01-02")}),
Entry("No days", nil, nil, "2006-01-02", nil),
)

DescribeTable("Staging table prefix", func(provider string) {
Expect(StagingTablePrefix(provider)).To(Equal(ToProviderCase(provider, "rudder_staging_")))
},
Expand Down
Loading