Skip to content

Commit

Permalink
feat: warehouse transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 20, 2024
1 parent da9f8c8 commit 98f7827
Show file tree
Hide file tree
Showing 54 changed files with 15,504 additions and 5 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/databricks/databricks-sql-go v1.6.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.3.1
github.com/dlclark/regexp2 v1.11.4
github.com/docker/docker v27.3.1+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
github.com/docker/cli-docs-tool v0.8.0 // indirect
Expand Down
20 changes: 16 additions & 4 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,25 @@ func WithClient(client *http.Client) Opt {
}
}

// Transformer provides methods to transform events
type Transformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
type UserTransformer interface {
UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type DestinationTransformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type TrackingPlanValidator interface {
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

// Transformer provides methods to transform events
type Transformer interface {
UserTransformer
DestinationTransformer
TrackingPlanValidator
}

// handle is the handle for this class
type handle struct {
sentStat stats.Measurement
Expand Down Expand Up @@ -526,7 +538,7 @@ func (trans *handle) destTransformURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))

if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
whSchemaVersionQueryParam := fmt.Sprintf("whIDResolve=%t", trans.conf.GetBool("Warehouse.enableIDResolution", false))
switch destType {
case warehouseutils.RS:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
Expand Down
62 changes: 62 additions & 0 deletions warehouse/transformer/alias.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package transformer

import (
"fmt"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/rules"
)

func (t *transformer) handleAliasEvent(pi *processingInfo) ([]map[string]any, error) {
event := make(map[string]any)
columnTypes := make(map[string]string)

if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["traits"], event, columnTypes, &prefixInfo{
completePrefix: "alias_traits_",
completeLevel: 2,
}); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
}
if err := t.setDataAndColumnTypeFromInput(pi, pi.event.Message["context"], event, columnTypes, &prefixInfo{
completePrefix: "alias_context_",
completeLevel: 2,
prefix: "context_",
}); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from input: %w", err)
}
if err := t.setDataAndColumnTypeFromRules(pi, event, columnTypes,
lo.Assign(rules.DefaultRules, rules.AliasRules), rules.DefaultFunctionalRules,
); err != nil {
return nil, fmt.Errorf("alias: setting data and column types from rules: %w", err)
}

if err := storeRudderEvent(pi, event, columnTypes); err != nil {
return nil, fmt.Errorf("alias: storing rudder event: %w", err)
}

tableName, err := SafeTableName(pi.event.Metadata.DestinationType, pi.itrOpts, "aliases")
if err != nil {
return nil, fmt.Errorf("alias: safe table name: %w", err)
}
columns, err := t.getColumns(pi.event.Metadata.DestinationType, event, columnTypes)
if err != nil {
return nil, fmt.Errorf("alias: getting columns: %w", err)
}

mergeEvents, err := t.handleMergeEvent(pi)
if err != nil {
return nil, fmt.Errorf("merge event: %w", err)
}

aliasOutput := map[string]any{
"data": event,
"metadata": map[string]any{
"table": tableName,
"columns": columns,
"receivedAt": pi.event.Metadata.ReceivedAt,
},
"userId": "",
}
return append([]map[string]any{aliasOutput}, mergeEvents...), nil
}
Loading

0 comments on commit 98f7827

Please sign in to comment.