Skip to content

Commit

Permalink
chore: review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Jan 31, 2025
1 parent ce9a97b commit 45dcaa9
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 160 deletions.
73 changes: 8 additions & 65 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,16 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime/trace"
"slices"
"strconv"
"strings"
"sync"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/google/uuid"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"
"github.com/rudderlabs/rudder-server/utils/timeutil"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -90,9 +86,9 @@ type trackedUsersReporter interface {
GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourceIdFilter map[string]bool) []*trackedusers.UsersReport
}

type warehouseTransformation interface {
type warehouseTransformer interface {
transformer.DestinationTransformer
Log(events []types.SingularEventT, metadata *transformer.Metadata) error
CompareAndLog(events []transformer.TransformerEvent, pResponse, wResponse transformer.Response, metadata *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt)
}

// Handle is a handle to the processor module
Expand All @@ -101,7 +97,7 @@ type Handle struct {
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
warehouseTransformer warehouseTransformation
warehouseTransformer warehouseTransformer

gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
Expand Down Expand Up @@ -173,12 +169,6 @@ type Handle struct {
enableWarehouseTransformations config.ValueLoader[bool]
}

warehouseTransformerStats struct {
responseTime stats.Timer
mismatches stats.Counter
logTime stats.Timer
}

drainConfig struct {
jobRunIDs config.ValueLoader[[]string]
}
Expand Down Expand Up @@ -638,9 +628,6 @@ func (proc *Handle) Setup(
}

proc.warehouseTransformer = wtrans.New(proc.conf, proc.logger, proc.statsFactory)
proc.warehouseTransformerStats.responseTime = proc.statsFactory.NewStat("proc_warehouse_transformations_time", stats.TimerType)
proc.warehouseTransformerStats.mismatches = proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType)
proc.warehouseTransformerStats.logTime = proc.statsFactory.NewStat("proc_warehouse_transformations_log_time", stats.TimerType)

if proc.config.enableDedup {
var err error
Expand Down Expand Up @@ -2668,10 +2655,10 @@ type transformSrcDestOutput struct {
func (proc *Handle) transformSrcDest(
ctx context.Context,
partition string,
// main inputs
// main inputs
srcAndDestKey string, eventList []transformer.TransformerEvent,

// helpers
// helpers
trackingPlanEnabledMap map[SourceIDT]bool,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
uniqueMessageIdsBySrcDestKey map[string]map[string]struct{},
Expand Down Expand Up @@ -2928,7 +2915,7 @@ func (proc *Handle) transformSrcDest(
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
s := time.Now()
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
proc.handleResponseForWarehouseTransformation(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID)
proc.handleWarehouseTransformations(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID)

destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
destTransformationStat.transformTime.Since(s)
Expand Down Expand Up @@ -3087,7 +3074,7 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) handleResponseForWarehouseTransformation(
func (proc *Handle) handleWarehouseTransformations(
ctx context.Context,
eventsToTransform []transformer.TransformerEvent,
pResponse transformer.Response,
Expand All @@ -3101,52 +3088,8 @@ func (proc *Handle) handleResponseForWarehouseTransformation(
return
}

transformStartAt := timeutil.Now()
wResponse := proc.warehouseTransformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
proc.warehouseTransformerStats.responseTime.Since(transformStartAt)

logStartAt := timeutil.Now()
differingEvents := proc.warehouseTransDifferEvents(eventsToTransform, pResponse, wResponse, eventsByMessageID)
if err := proc.warehouseTransformer.Log(differingEvents, commonMetaData); err != nil {
proc.logger.Warnn("Failed to log events for warehouse transformation debugging", obskit.Error(err))
}
proc.warehouseTransformerStats.logTime.Since(logStartAt)
}

func (proc *Handle) warehouseTransDifferEvents(
eventsToTransform []transformer.TransformerEvent,
pResponse, wResponse transformer.Response,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) []types.SingularEventT {
// If the event counts differ, return all events in the transformation
if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) {
events := lo.Map(eventsToTransform, func(e transformer.TransformerEvent, _ int) types.SingularEventT {
return eventsByMessageID[e.Metadata.MessageID].SingularEvent
})
proc.warehouseTransformerStats.mismatches.Count(len(events))
return events
}

var (
differedSampleEvents []types.SingularEventT
differedEventsCount int
)

for i := range pResponse.Events {
if reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) {
continue
}

differedEventsCount++
if len(differedSampleEvents) != 0 {
// Collect the mismatched messages and break (sample only)
differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
}
}
proc.warehouseTransformerStats.mismatches.Count(differedEventsCount)
return differedSampleEvents
proc.warehouseTransformer.CompareAndLog(eventsToTransform, pResponse, wResponse, commonMetaData, eventsByMessageID)
}

func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
Expand Down
19 changes: 14 additions & 5 deletions warehouse/transformer/datatype.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package transformer

import (
"reflect"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/transformer/internal/utils"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand Down Expand Up @@ -35,6 +37,8 @@ func primitiveType(val any) string {
}

func getFloatType(v float64) string {
// JSON unmarshalling treats all numbers as float64 by default, even if they are whole numbers
// So, we need to check if the float is actually an integer
if v == float64(int64(v)) {
return model.IntDataType
}
Expand All @@ -55,14 +59,14 @@ func dataTypeOverride(destType, key string, val any, isJSONKey bool) string {
}

func overrideForPostgres(key string, isJSONKey bool) string {
if key == violationErrors || isJSONKey {
if isJSONKey || key == violationErrors {
return model.JSONDataType
}
return model.StringDataType
}

func overrideForSnowflake(key string, isJSONKey bool) string {
if key == violationErrors || isJSONKey {
if isJSONKey || key == violationErrors {
return model.JSONDataType
}
return model.StringDataType
Expand All @@ -75,10 +79,15 @@ func overrideForRedshift(val any, isJSONKey bool) string {
if val == nil {
return model.StringDataType
}
if jsonVal, _ := json.Marshal(val); len(jsonVal) > redshiftStringLimit {
return model.TextDataType
switch reflect.TypeOf(val).Kind() {
case reflect.Slice, reflect.Array:
if jsonVal, _ := json.Marshal(val); len(jsonVal) > redshiftStringLimit {
return model.TextDataType
}
return model.StringDataType
default:
return model.StringDataType
}
return model.StringDataType
}

func convertValIfDateTime(val any, colType string) any {
Expand Down
122 changes: 61 additions & 61 deletions warehouse/transformer/internal/snakecase/snakecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,65 +9,65 @@ import (

const (
// Used to compose unicode character classes.
rsAstralRange = "\\ud800-\\udfff"
rsComboMarksRange = "\\u0300-\\u036f"
reComboHalfMarksRange = "\\ufe20-\\ufe2f"
rsComboSymbolsRange = "\\u20d0-\\u20ff"
rsComboMarksExtendedRange = "\\u1ab0-\\u1aff"
rsComboMarksSupplementRange = "\\u1dc0-\\u1dff"
rsComboRange = rsComboMarksRange + reComboHalfMarksRange + rsComboSymbolsRange + rsComboMarksExtendedRange + rsComboMarksSupplementRange
rsDingbatRange = "\\u2700-\\u27bf"
rsLowerRange = "a-z\\xdf-\\xf6\\xf8-\\xff"
rsMathOpRange = "\\xac\\xb1\\xd7\\xf7"
rsNonCharRange = "\\x00-\\x2f\\x3a-\\x40\\x5b-\\x60\\x7b-\\xbf"
rsPunctuationRange = "\\u2000-\\u206f"
rsSpaceRange = " \\t\\x0b\\f\\xa0\\ufeff\\n\\r\\u2028\\u2029\\u1680\\u180e\\u2000\\u2001\\u2002\\u2003\\u2004\\u2005\\u2006\\u2007\\u2008\\u2009\\u200a\\u202f\\u205f\\u3000"
rsUpperRange = "A-Z\\xc0-\\xd6\\xd8-\\xde"
rsVarRange = "\\ufe0e\\ufe0f"
rsBreakRange = rsMathOpRange + rsNonCharRange + rsPunctuationRange + rsSpaceRange
astralRange = "\\ud800-\\udfff"
comboMarksRange = "\\u0300-\\u036f"
comboHalfMarksRange = "\\ufe20-\\ufe2f"
comboSymbolsRange = "\\u20d0-\\u20ff"
comboMarksExtendedRange = "\\u1ab0-\\u1aff"
comboMarksSupplementRange = "\\u1dc0-\\u1dff"
comboRange = comboMarksRange + comboHalfMarksRange + comboSymbolsRange + comboMarksExtendedRange + comboMarksSupplementRange
dingbatRange = "\\u2700-\\u27bf"
lowerRange = "a-z\\xdf-\\xf6\\xf8-\\xff"
mathOpRange = "\\xac\\xb1\\xd7\\xf7"
nonCharRange = "\\x00-\\x2f\\x3a-\\x40\\x5b-\\x60\\x7b-\\xbf"
punctuationRange = "\\u2000-\\u206f"
spaceRange = " \\t\\x0b\\f\\xa0\\ufeff\\n\\r\\u2028\\u2029\\u1680\\u180e\\u2000\\u2001\\u2002\\u2003\\u2004\\u2005\\u2006\\u2007\\u2008\\u2009\\u200a\\u202f\\u205f\\u3000"
upperRange = "A-Z\\xc0-\\xd6\\xd8-\\xde"
varRange = "\\ufe0e\\ufe0f"
breakRange = mathOpRange + nonCharRange + punctuationRange + spaceRange

// Used to compose unicode capture groups
rsApos = "['\u2019]"
rsBreak = "[" + rsBreakRange + "]"
rsCombo = "[" + rsComboRange + "]"
rsDigit = "\\d"
rsDingbat = "[" + rsDingbatRange + "]"
rsLower = "[" + rsLowerRange + "]"
rsMisc = "[^" + rsAstralRange + rsBreakRange + rsDigit + rsDingbatRange + rsLowerRange + rsUpperRange + "]"
rsFitz = "\\ud83c[\\udffb-\\udfff]"
rsModifier = "(?:" + rsCombo + "|" + rsFitz + ")"
rsNonAstral = "[^" + rsAstralRange + "]"
rsRegional = "(?:\\ud83c[\\udde6-\\uddff]){2}"
rsSurrPair = "[\\ud800-\\udbff][\\udc00-\\udfff]"
rsUpper = "[" + rsUpperRange + "]"
rsZWJ = "\\u200d"
apos = "['\u2019]"
breakExp = "[" + breakRange + "]"
combo = "[" + comboRange + "]"
digit = "\\d"
dingbat = "[" + dingbatRange + "]"
lower = "[" + lowerRange + "]"
misc = "[^" + astralRange + breakRange + digit + dingbatRange + lowerRange + upperRange + "]"
fitz = "\\ud83c[\\udffb-\\udfff]"
modifier = "(?:" + combo + "|" + fitz + ")"
nonAstral = "[^" + astralRange + "]"
regional = "(?:\\ud83c[\\udde6-\\uddff]){2}"
surrPair = "[\\ud800-\\udbff][\\udc00-\\udfff]"
upper = "[" + upperRange + "]"
zwj = "\\u200d"

// Used to compose unicode regexes
rsMiscLower = "(?:" + rsLower + "|" + rsMisc + ")"
rsMiscUpper = "(?:" + rsUpper + "|" + rsMisc + ")"
rsOptContrLower = "(?:" + rsApos + "(?:d|ll|m|re|s|t|ve))?"
rsOptContrUpper = "(?:" + rsApos + "(?:D|LL|M|RE|S|T|VE))?"
reOptMod = rsModifier + "?"
rsOptVar = "[" + rsVarRange + "]?"
rsOptJoin = "(?:" + rsZWJ + "(?:" + rsNonAstral + "|" + rsRegional + "|" + rsSurrPair + ")" + rsOptVar + reOptMod + ")*"
rsOrdLower = "\\d*(?:1st|2nd|3rd|(?![123])\\dth)(?=\\b|[A-Z_])"
rsOrdUpper = "\\d*(?:1ST|2ND|3RD|(?![123])\\dTH)(?=\\b|[a-z_])"
rsSeq = rsOptVar + reOptMod + rsOptJoin
rsEmoji = "(?:" + rsDingbat + "|" + rsRegional + "|" + rsSurrPair + ")" + rsSeq
miscLower = "(?:" + lower + "|" + misc + ")"
miscUpper = "(?:" + upper + "|" + misc + ")"
optContrLower = "(?:" + apos + "(?:d|ll|m|re|s|t|ve))?"
optContrUpper = "(?:" + apos + "(?:D|LL|M|RE|S|T|VE))?"
optMod = modifier + "?"
optVar = "[" + varRange + "]?"
optJoin = "(?:" + zwj + "(?:" + nonAstral + "|" + regional + "|" + surrPair + ")" + optVar + optMod + ")*"
ordLower = "\\d*(?:1st|2nd|3rd|(?![123])\\dth)(?=\\b|[A-Z_])"
ordUpper = "\\d*(?:1ST|2ND|3RD|(?![123])\\dTH)(?=\\b|[a-z_])"
seq = optVar + optMod + optJoin
emoji = "(?:" + dingbat + "|" + regional + "|" + surrPair + ")" + seq
)

var (
reUnicodeWords = regexp2.MustCompile(
strings.Join(
[]string{
rsUpper + "?" + rsLower + "+" + rsOptContrLower + "(?=" + rsBreak + "|" + rsUpper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions
rsMiscUpper + "+" + rsOptContrUpper + "(?=" + rsBreak + "|" + rsUpper + rsMiscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions
rsUpper + "?" + rsMiscLower + "+" + rsOptContrLower, // Miscellaneous lowercase sequences with optional contractions
rsUpper + "+" + rsOptContrUpper, // All uppercase words with optional contractions (e.g., "THIS")
rsOrdUpper, // Ordinals for uppercase (e.g., "1ST", "2ND")
rsOrdLower, // Ordinals for lowercase (e.g., "1st", "2nd")
rsDigit + "+", // Pure digits (e.g., "123")
rsEmoji, // Emojis (e.g., 😀, ❤️)
upper + "?" + lower + "+" + optContrLower + "(?=" + breakExp + "|" + upper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions
miscUpper + "+" + optContrUpper + "(?=" + breakExp + "|" + upper + miscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions
upper + "?" + miscLower + "+" + optContrLower, // Miscellaneous lowercase sequences with optional contractions
upper + "+" + optContrUpper, // All uppercase words with optional contractions (e.g., "THIS")
ordUpper, // Ordinals for uppercase (e.g., "1ST", "2ND")
ordLower, // Ordinals for lowercase (e.g., "1st", "2nd")
digit + "+", // Pure digits (e.g., "123")
emoji, // Emojis (e.g., 😀, ❤️)
},
"|",
),
Expand All @@ -76,18 +76,18 @@ var (
reUnicodeWordsWithNumbers = regexp2.MustCompile(
strings.Join(
[]string{
rsUpper + "?" + rsLower + "+" + rsDigit + "+", // Lowercase letters followed by digits (e.g., "abc123")
rsUpper + "+" + rsDigit + "+", // Uppercase letters followed by digits (e.g., "ABC123")
rsDigit + "+" + rsUpper + "?" + rsLower + "+", // Digits followed by lowercase letters (e.g., "123abc")
rsDigit + "+" + rsUpper + "+", // Digits followed by uppercase letters (e.g., "123ABC")
rsUpper + "?" + rsLower + "+" + rsOptContrLower + "(?=" + rsBreak + "|" + rsUpper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions
rsMiscUpper + "+" + rsOptContrUpper + "(?=" + rsBreak + "|" + rsUpper + rsMiscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions
rsUpper + "?" + rsMiscLower + "+" + rsOptContrLower, // Miscellaneous lowercase sequences with optional contractions
rsUpper + "+" + rsOptContrUpper, // All uppercase words with optional contractions (e.g., "THIS")
rsOrdUpper, // Ordinals for uppercase (e.g., "1ST", "2ND")
rsOrdLower, // Ordinals for lowercase (e.g., "1st", "2nd")
rsDigit + "+", // Pure digits (e.g., "123")
rsEmoji, // Emojis (e.g., 😀, ❤️)
upper + "?" + lower + "+" + digit + "+", // Lowercase letters followed by digits (e.g., "abc123")
upper + "+" + digit + "+", // Uppercase letters followed by digits (e.g., "ABC123")
digit + "+" + upper + "?" + lower + "+", // Digits followed by lowercase letters (e.g., "123abc")
digit + "+" + upper + "+", // Digits followed by uppercase letters (e.g., "123ABC")
upper + "?" + lower + "+" + optContrLower + "(?=" + breakExp + "|" + upper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions
miscUpper + "+" + optContrUpper + "(?=" + breakExp + "|" + upper + miscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions
upper + "?" + miscLower + "+" + optContrLower, // Miscellaneous lowercase sequences with optional contractions
upper + "+" + optContrUpper, // All uppercase words with optional contractions (e.g., "THIS")
ordUpper, // Ordinals for uppercase (e.g., "1ST", "2ND")
ordLower, // Ordinals for lowercase (e.g., "1st", "2nd")
digit + "+", // Pure digits (e.g., "123")
emoji, // Emojis (e.g., 😀, ❤️)
},
"|",
),
Expand Down
2 changes: 1 addition & 1 deletion warehouse/transformer/internal/snakecase/snakecase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestToSnakeCase(t *testing.T) {
}
})
})
t.Run("extractWords", func(t *testing.T) {
t.Run("extractWordsWithNumbers", func(t *testing.T) {
t.Run("should match words containing Latin Unicode letters", func(t *testing.T) {
for _, letter := range burredLetters {
require.Equal(t, []string{string(letter)}, extractWordsWithNumbers(string(letter)))
Expand Down
21 changes: 12 additions & 9 deletions warehouse/transformer/jsonpaths.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import (
func extractJSONPathInfo(jsonPaths []string) jsonPathInfo {
keysMap, legacyKeysMap := make(map[string]int), make(map[string]int)
for _, jsonPath := range jsonPaths {
if trimmedJSONPath := strings.TrimSpace(jsonPath); trimmedJSONPath != "" {
splitPaths := strings.Split(jsonPath, ".")
key := strings.Join(splitPaths, "_")
pos := len(splitPaths) - 1
trimmedJSONPath := strings.TrimSpace(jsonPath)
if trimmedJSONPath == "" {
continue
}

splitPaths := strings.Split(trimmedJSONPath, ".")
key := strings.Join(splitPaths, "_")
pos := len(splitPaths) - 1

if utils.HasJSONPathPrefix(jsonPath) {
keysMap[key] = pos
continue
}
legacyKeysMap[key] = pos
if utils.HasJSONPathPrefix(trimmedJSONPath) {
keysMap[key] = pos
continue
}
legacyKeysMap[key] = pos
}
return jsonPathInfo{keysMap, legacyKeysMap}
}
Expand Down
Loading

0 comments on commit 45dcaa9

Please sign in to comment.