From 45dcaa9889891af0d697ebc8ea906fc8f67f85dc Mon Sep 17 00:00:00 2001 From: achettyiitr Date: Fri, 31 Jan 2025 01:54:08 +0530 Subject: [PATCH] chore: review comments --- processor/processor.go | 73 ++--------- warehouse/transformer/datatype.go | 19 ++- .../internal/snakecase/snakecase.go | 122 +++++++++--------- .../internal/snakecase/snakecase_test.go | 2 +- warehouse/transformer/jsonpaths.go | 21 +-- warehouse/transformer/jsonpaths_test.go | 4 +- warehouse/transformer/logger.go | 65 +++++++++- warehouse/transformer/safe.go | 12 +- warehouse/transformer/transformer.go | 8 +- warehouse/transformer/transformer_test.go | 111 +++++++++++++++- warehouse/transformer/types.go | 4 + 11 files changed, 281 insertions(+), 160 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 38b0cfd124..61b7533764 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "reflect" "runtime/trace" "slices" "strconv" @@ -13,12 +12,9 @@ import ( "sync" "time" - obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" - "github.com/google/uuid" "github.com/rudderlabs/rudder-server/enterprise/trackedusers" - "github.com/rudderlabs/rudder-server/utils/timeutil" whutils "github.com/rudderlabs/rudder-server/warehouse/utils" "golang.org/x/sync/errgroup" @@ -90,9 +86,9 @@ type trackedUsersReporter interface { GenerateReportsFromJobs(jobs []*jobsdb.JobT, sourceIdFilter map[string]bool) []*trackedusers.UsersReport } -type warehouseTransformation interface { +type warehouseTransformer interface { transformer.DestinationTransformer - Log(events []types.SingularEventT, metadata *transformer.Metadata) error + CompareAndLog(events []transformer.TransformerEvent, pResponse, wResponse transformer.Response, metadata *transformer.Metadata, eventsByMessageID map[string]types.SingularEventWithReceivedAt) } // Handle is a handle to the processor module @@ -101,7 +97,7 @@ type Handle struct { tracer stats.Tracer backendConfig backendconfig.BackendConfig transformer transformer.Transformer - warehouseTransformer warehouseTransformation + warehouseTransformer warehouseTransformer gatewayDB jobsdb.JobsDB routerDB jobsdb.JobsDB @@ -173,12 +169,6 @@ type Handle struct { enableWarehouseTransformations config.ValueLoader[bool] } - warehouseTransformerStats struct { - responseTime stats.Timer - mismatches stats.Counter - logTime stats.Timer - } - drainConfig struct { jobRunIDs config.ValueLoader[[]string] } @@ -638,9 +628,6 @@ func (proc *Handle) Setup( } proc.warehouseTransformer = wtrans.New(proc.conf, proc.logger, proc.statsFactory) - proc.warehouseTransformerStats.responseTime = proc.statsFactory.NewStat("proc_warehouse_transformations_time", stats.TimerType) - proc.warehouseTransformerStats.mismatches = proc.statsFactory.NewStat("proc_warehouse_transformations_mismatches", stats.CountType) - proc.warehouseTransformerStats.logTime = proc.statsFactory.NewStat("proc_warehouse_transformations_log_time", stats.TimerType) if proc.config.enableDedup { var err error @@ -2668,10 +2655,10 @@ type transformSrcDestOutput struct { func (proc *Handle) transformSrcDest( ctx context.Context, partition string, - // main inputs +// main inputs srcAndDestKey string, eventList []transformer.TransformerEvent, - // helpers +// helpers trackingPlanEnabledMap map[SourceIDT]bool, eventsByMessageID map[string]types.SingularEventWithReceivedAt, uniqueMessageIdsBySrcDestKey map[string]map[string]struct{}, @@ -2928,7 +2915,7 @@ func (proc *Handle) transformSrcDest( proc.logger.Debug("Dest Transform input size", len(eventsToTransform)) s := time.Now() response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load()) - proc.handleResponseForWarehouseTransformation(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID) + proc.handleWarehouseTransformations(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID) destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination) destTransformationStat.transformTime.Since(s) @@ -3087,7 +3074,7 @@ func (proc *Handle) transformSrcDest( } } -func (proc *Handle) handleResponseForWarehouseTransformation( +func (proc *Handle) handleWarehouseTransformations( ctx context.Context, eventsToTransform []transformer.TransformerEvent, pResponse transformer.Response, @@ -3101,52 +3088,8 @@ func (proc *Handle) handleResponseForWarehouseTransformation( return } - transformStartAt := timeutil.Now() wResponse := proc.warehouseTransformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load()) - proc.warehouseTransformerStats.responseTime.Since(transformStartAt) - - logStartAt := timeutil.Now() - differingEvents := proc.warehouseTransDifferEvents(eventsToTransform, pResponse, wResponse, eventsByMessageID) - if err := proc.warehouseTransformer.Log(differingEvents, commonMetaData); err != nil { - proc.logger.Warnn("Failed to log events for warehouse transformation debugging", obskit.Error(err)) - } - proc.warehouseTransformerStats.logTime.Since(logStartAt) -} - -func (proc *Handle) warehouseTransDifferEvents( - eventsToTransform []transformer.TransformerEvent, - pResponse, wResponse transformer.Response, - eventsByMessageID map[string]types.SingularEventWithReceivedAt, -) []types.SingularEventT { - // If the event counts differ, return all events in the transformation - if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) { - events := lo.Map(eventsToTransform, func(e transformer.TransformerEvent, _ int) types.SingularEventT { - return eventsByMessageID[e.Metadata.MessageID].SingularEvent - }) - proc.warehouseTransformerStats.mismatches.Count(len(events)) - return events - } - - var ( - differedSampleEvents []types.SingularEventT - differedEventsCount int - ) - - for i := range pResponse.Events { - if reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) { - continue - } - - differedEventsCount++ - if len(differedSampleEvents) != 0 { - // Collect the mismatched messages and break (sample only) - differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT { - return eventsByMessageID[msgID].SingularEvent - })...) - } - } - proc.warehouseTransformerStats.mismatches.Count(differedEventsCount) - return differedSampleEvents + proc.warehouseTransformer.CompareAndLog(eventsToTransform, pResponse, wResponse, commonMetaData, eventsByMessageID) } func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error { diff --git a/warehouse/transformer/datatype.go b/warehouse/transformer/datatype.go index 97508b5973..711b93d98d 100644 --- a/warehouse/transformer/datatype.go +++ b/warehouse/transformer/datatype.go @@ -1,6 +1,8 @@ package transformer import ( + "reflect" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/transformer/internal/utils" whutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -35,6 +37,8 @@ func primitiveType(val any) string { } func getFloatType(v float64) string { + // JSON unmarshalling treats all numbers as float64 by default, even if they are whole numbers + // So, we need to check if the float is actually an integer if v == float64(int64(v)) { return model.IntDataType } @@ -55,14 +59,14 @@ func dataTypeOverride(destType, key string, val any, isJSONKey bool) string { } func overrideForPostgres(key string, isJSONKey bool) string { - if key == violationErrors || isJSONKey { + if isJSONKey || key == violationErrors { return model.JSONDataType } return model.StringDataType } func overrideForSnowflake(key string, isJSONKey bool) string { - if key == violationErrors || isJSONKey { + if isJSONKey || key == violationErrors { return model.JSONDataType } return model.StringDataType @@ -75,10 +79,15 @@ func overrideForRedshift(val any, isJSONKey bool) string { if val == nil { return model.StringDataType } - if jsonVal, _ := json.Marshal(val); len(jsonVal) > redshiftStringLimit { - return model.TextDataType + switch reflect.TypeOf(val).Kind() { + case reflect.Slice, reflect.Array: + if jsonVal, _ := json.Marshal(val); len(jsonVal) > redshiftStringLimit { + return model.TextDataType + } + return model.StringDataType + default: + return model.StringDataType } - return model.StringDataType } func convertValIfDateTime(val any, colType string) any { diff --git a/warehouse/transformer/internal/snakecase/snakecase.go b/warehouse/transformer/internal/snakecase/snakecase.go index db8f355e32..88df70ac8c 100644 --- a/warehouse/transformer/internal/snakecase/snakecase.go +++ b/warehouse/transformer/internal/snakecase/snakecase.go @@ -9,65 +9,65 @@ import ( const ( // Used to compose unicode character classes. - rsAstralRange = "\\ud800-\\udfff" - rsComboMarksRange = "\\u0300-\\u036f" - reComboHalfMarksRange = "\\ufe20-\\ufe2f" - rsComboSymbolsRange = "\\u20d0-\\u20ff" - rsComboMarksExtendedRange = "\\u1ab0-\\u1aff" - rsComboMarksSupplementRange = "\\u1dc0-\\u1dff" - rsComboRange = rsComboMarksRange + reComboHalfMarksRange + rsComboSymbolsRange + rsComboMarksExtendedRange + rsComboMarksSupplementRange - rsDingbatRange = "\\u2700-\\u27bf" - rsLowerRange = "a-z\\xdf-\\xf6\\xf8-\\xff" - rsMathOpRange = "\\xac\\xb1\\xd7\\xf7" - rsNonCharRange = "\\x00-\\x2f\\x3a-\\x40\\x5b-\\x60\\x7b-\\xbf" - rsPunctuationRange = "\\u2000-\\u206f" - rsSpaceRange = " \\t\\x0b\\f\\xa0\\ufeff\\n\\r\\u2028\\u2029\\u1680\\u180e\\u2000\\u2001\\u2002\\u2003\\u2004\\u2005\\u2006\\u2007\\u2008\\u2009\\u200a\\u202f\\u205f\\u3000" - rsUpperRange = "A-Z\\xc0-\\xd6\\xd8-\\xde" - rsVarRange = "\\ufe0e\\ufe0f" - rsBreakRange = rsMathOpRange + rsNonCharRange + rsPunctuationRange + rsSpaceRange + astralRange = "\\ud800-\\udfff" + comboMarksRange = "\\u0300-\\u036f" + comboHalfMarksRange = "\\ufe20-\\ufe2f" + comboSymbolsRange = "\\u20d0-\\u20ff" + comboMarksExtendedRange = "\\u1ab0-\\u1aff" + comboMarksSupplementRange = "\\u1dc0-\\u1dff" + comboRange = comboMarksRange + comboHalfMarksRange + comboSymbolsRange + comboMarksExtendedRange + comboMarksSupplementRange + dingbatRange = "\\u2700-\\u27bf" + lowerRange = "a-z\\xdf-\\xf6\\xf8-\\xff" + mathOpRange = "\\xac\\xb1\\xd7\\xf7" + nonCharRange = "\\x00-\\x2f\\x3a-\\x40\\x5b-\\x60\\x7b-\\xbf" + punctuationRange = "\\u2000-\\u206f" + spaceRange = " \\t\\x0b\\f\\xa0\\ufeff\\n\\r\\u2028\\u2029\\u1680\\u180e\\u2000\\u2001\\u2002\\u2003\\u2004\\u2005\\u2006\\u2007\\u2008\\u2009\\u200a\\u202f\\u205f\\u3000" + upperRange = "A-Z\\xc0-\\xd6\\xd8-\\xde" + varRange = "\\ufe0e\\ufe0f" + breakRange = mathOpRange + nonCharRange + punctuationRange + spaceRange // Used to compose unicode capture groups - rsApos = "['\u2019]" - rsBreak = "[" + rsBreakRange + "]" - rsCombo = "[" + rsComboRange + "]" - rsDigit = "\\d" - rsDingbat = "[" + rsDingbatRange + "]" - rsLower = "[" + rsLowerRange + "]" - rsMisc = "[^" + rsAstralRange + rsBreakRange + rsDigit + rsDingbatRange + rsLowerRange + rsUpperRange + "]" - rsFitz = "\\ud83c[\\udffb-\\udfff]" - rsModifier = "(?:" + rsCombo + "|" + rsFitz + ")" - rsNonAstral = "[^" + rsAstralRange + "]" - rsRegional = "(?:\\ud83c[\\udde6-\\uddff]){2}" - rsSurrPair = "[\\ud800-\\udbff][\\udc00-\\udfff]" - rsUpper = "[" + rsUpperRange + "]" - rsZWJ = "\\u200d" + apos = "['\u2019]" + breakExp = "[" + breakRange + "]" + combo = "[" + comboRange + "]" + digit = "\\d" + dingbat = "[" + dingbatRange + "]" + lower = "[" + lowerRange + "]" + misc = "[^" + astralRange + breakRange + digit + dingbatRange + lowerRange + upperRange + "]" + fitz = "\\ud83c[\\udffb-\\udfff]" + modifier = "(?:" + combo + "|" + fitz + ")" + nonAstral = "[^" + astralRange + "]" + regional = "(?:\\ud83c[\\udde6-\\uddff]){2}" + surrPair = "[\\ud800-\\udbff][\\udc00-\\udfff]" + upper = "[" + upperRange + "]" + zwj = "\\u200d" // Used to compose unicode regexes - rsMiscLower = "(?:" + rsLower + "|" + rsMisc + ")" - rsMiscUpper = "(?:" + rsUpper + "|" + rsMisc + ")" - rsOptContrLower = "(?:" + rsApos + "(?:d|ll|m|re|s|t|ve))?" - rsOptContrUpper = "(?:" + rsApos + "(?:D|LL|M|RE|S|T|VE))?" - reOptMod = rsModifier + "?" - rsOptVar = "[" + rsVarRange + "]?" - rsOptJoin = "(?:" + rsZWJ + "(?:" + rsNonAstral + "|" + rsRegional + "|" + rsSurrPair + ")" + rsOptVar + reOptMod + ")*" - rsOrdLower = "\\d*(?:1st|2nd|3rd|(?![123])\\dth)(?=\\b|[A-Z_])" - rsOrdUpper = "\\d*(?:1ST|2ND|3RD|(?![123])\\dTH)(?=\\b|[a-z_])" - rsSeq = rsOptVar + reOptMod + rsOptJoin - rsEmoji = "(?:" + rsDingbat + "|" + rsRegional + "|" + rsSurrPair + ")" + rsSeq + miscLower = "(?:" + lower + "|" + misc + ")" + miscUpper = "(?:" + upper + "|" + misc + ")" + optContrLower = "(?:" + apos + "(?:d|ll|m|re|s|t|ve))?" + optContrUpper = "(?:" + apos + "(?:D|LL|M|RE|S|T|VE))?" + optMod = modifier + "?" + optVar = "[" + varRange + "]?" + optJoin = "(?:" + zwj + "(?:" + nonAstral + "|" + regional + "|" + surrPair + ")" + optVar + optMod + ")*" + ordLower = "\\d*(?:1st|2nd|3rd|(?![123])\\dth)(?=\\b|[A-Z_])" + ordUpper = "\\d*(?:1ST|2ND|3RD|(?![123])\\dTH)(?=\\b|[a-z_])" + seq = optVar + optMod + optJoin + emoji = "(?:" + dingbat + "|" + regional + "|" + surrPair + ")" + seq ) var ( reUnicodeWords = regexp2.MustCompile( strings.Join( []string{ - rsUpper + "?" + rsLower + "+" + rsOptContrLower + "(?=" + rsBreak + "|" + rsUpper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions - rsMiscUpper + "+" + rsOptContrUpper + "(?=" + rsBreak + "|" + rsUpper + rsMiscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions - rsUpper + "?" + rsMiscLower + "+" + rsOptContrLower, // Miscellaneous lowercase sequences with optional contractions - rsUpper + "+" + rsOptContrUpper, // All uppercase words with optional contractions (e.g., "THIS") - rsOrdUpper, // Ordinals for uppercase (e.g., "1ST", "2ND") - rsOrdLower, // Ordinals for lowercase (e.g., "1st", "2nd") - rsDigit + "+", // Pure digits (e.g., "123") - rsEmoji, // Emojis (e.g., 😀, ❤️) + upper + "?" + lower + "+" + optContrLower + "(?=" + breakExp + "|" + upper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions + miscUpper + "+" + optContrUpper + "(?=" + breakExp + "|" + upper + miscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions + upper + "?" + miscLower + "+" + optContrLower, // Miscellaneous lowercase sequences with optional contractions + upper + "+" + optContrUpper, // All uppercase words with optional contractions (e.g., "THIS") + ordUpper, // Ordinals for uppercase (e.g., "1ST", "2ND") + ordLower, // Ordinals for lowercase (e.g., "1st", "2nd") + digit + "+", // Pure digits (e.g., "123") + emoji, // Emojis (e.g., 😀, ❤️) }, "|", ), @@ -76,18 +76,18 @@ var ( reUnicodeWordsWithNumbers = regexp2.MustCompile( strings.Join( []string{ - rsUpper + "?" + rsLower + "+" + rsDigit + "+", // Lowercase letters followed by digits (e.g., "abc123") - rsUpper + "+" + rsDigit + "+", // Uppercase letters followed by digits (e.g., "ABC123") - rsDigit + "+" + rsUpper + "?" + rsLower + "+", // Digits followed by lowercase letters (e.g., "123abc") - rsDigit + "+" + rsUpper + "+", // Digits followed by uppercase letters (e.g., "123ABC") - rsUpper + "?" + rsLower + "+" + rsOptContrLower + "(?=" + rsBreak + "|" + rsUpper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions - rsMiscUpper + "+" + rsOptContrUpper + "(?=" + rsBreak + "|" + rsUpper + rsMiscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions - rsUpper + "?" + rsMiscLower + "+" + rsOptContrLower, // Miscellaneous lowercase sequences with optional contractions - rsUpper + "+" + rsOptContrUpper, // All uppercase words with optional contractions (e.g., "THIS") - rsOrdUpper, // Ordinals for uppercase (e.g., "1ST", "2ND") - rsOrdLower, // Ordinals for lowercase (e.g., "1st", "2nd") - rsDigit + "+", // Pure digits (e.g., "123") - rsEmoji, // Emojis (e.g., 😀, ❤️) + upper + "?" + lower + "+" + digit + "+", // Lowercase letters followed by digits (e.g., "abc123") + upper + "+" + digit + "+", // Uppercase letters followed by digits (e.g., "ABC123") + digit + "+" + upper + "?" + lower + "+", // Digits followed by lowercase letters (e.g., "123abc") + digit + "+" + upper + "+", // Digits followed by uppercase letters (e.g., "123ABC") + upper + "?" + lower + "+" + optContrLower + "(?=" + breakExp + "|" + upper + "|" + "$)", // Regular words, lowercase letters followed by optional contractions + miscUpper + "+" + optContrUpper + "(?=" + breakExp + "|" + upper + miscLower + "|" + "$)", // Miscellaneous uppercase characters with optional contractions + upper + "?" + miscLower + "+" + optContrLower, // Miscellaneous lowercase sequences with optional contractions + upper + "+" + optContrUpper, // All uppercase words with optional contractions (e.g., "THIS") + ordUpper, // Ordinals for uppercase (e.g., "1ST", "2ND") + ordLower, // Ordinals for lowercase (e.g., "1st", "2nd") + digit + "+", // Pure digits (e.g., "123") + emoji, // Emojis (e.g., 😀, ❤️) }, "|", ), diff --git a/warehouse/transformer/internal/snakecase/snakecase_test.go b/warehouse/transformer/internal/snakecase/snakecase_test.go index 6a8a18c391..7964919ffb 100644 --- a/warehouse/transformer/internal/snakecase/snakecase_test.go +++ b/warehouse/transformer/internal/snakecase/snakecase_test.go @@ -101,7 +101,7 @@ func TestToSnakeCase(t *testing.T) { } }) }) - t.Run("extractWords", func(t *testing.T) { + t.Run("extractWordsWithNumbers", func(t *testing.T) { t.Run("should match words containing Latin Unicode letters", func(t *testing.T) { for _, letter := range burredLetters { require.Equal(t, []string{string(letter)}, extractWordsWithNumbers(string(letter))) diff --git a/warehouse/transformer/jsonpaths.go b/warehouse/transformer/jsonpaths.go index f55c08f748..9729d95476 100644 --- a/warehouse/transformer/jsonpaths.go +++ b/warehouse/transformer/jsonpaths.go @@ -9,17 +9,20 @@ import ( func extractJSONPathInfo(jsonPaths []string) jsonPathInfo { keysMap, legacyKeysMap := make(map[string]int), make(map[string]int) for _, jsonPath := range jsonPaths { - if trimmedJSONPath := strings.TrimSpace(jsonPath); trimmedJSONPath != "" { - splitPaths := strings.Split(jsonPath, ".") - key := strings.Join(splitPaths, "_") - pos := len(splitPaths) - 1 + trimmedJSONPath := strings.TrimSpace(jsonPath) + if trimmedJSONPath == "" { + continue + } + + splitPaths := strings.Split(trimmedJSONPath, ".") + key := strings.Join(splitPaths, "_") + pos := len(splitPaths) - 1 - if utils.HasJSONPathPrefix(jsonPath) { - keysMap[key] = pos - continue - } - legacyKeysMap[key] = pos + if utils.HasJSONPathPrefix(trimmedJSONPath) { + keysMap[key] = pos + continue } + legacyKeysMap[key] = pos } return jsonPathInfo{keysMap, legacyKeysMap} } diff --git a/warehouse/transformer/jsonpaths_test.go b/warehouse/transformer/jsonpaths_test.go index 69b3e39053..b9bc616a38 100644 --- a/warehouse/transformer/jsonpaths_test.go +++ b/warehouse/transformer/jsonpaths_test.go @@ -30,9 +30,9 @@ func TestExtractJSONPathInfo(t *testing.T) { }, { name: "Whitespace and empty path", - jsonPaths: []string{" ", "track.properties.name", ""}, + jsonPaths: []string{" ", "track.properties.name", "", " track.properties.age "}, expected: jsonPathInfo{ - keysMap: map[string]int{"track_properties_name": 2}, + keysMap: map[string]int{"track_properties_name": 2, "track_properties_age": 2}, legacyKeysMap: make(map[string]int), }, }, diff --git a/warehouse/transformer/logger.go b/warehouse/transformer/logger.go index 271c771857..efc2b460d8 100644 --- a/warehouse/transformer/logger.go +++ b/warehouse/transformer/logger.go @@ -2,10 +2,13 @@ package transformer import ( "fmt" + "reflect" "github.com/google/uuid" "github.com/samber/lo" + obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stringify" @@ -14,34 +17,82 @@ import ( "github.com/rudderlabs/rudder-server/utils/types" ) -func (t *Transformer) Log(events []types.SingularEventT, metadata *ptrans.Metadata) error { +func (t *Transformer) CompareAndLog( + events []ptrans.TransformerEvent, + pResponse, wResponse ptrans.Response, + metadata *ptrans.Metadata, + eventsByMessageID map[string]types.SingularEventWithReceivedAt, +) { if len(events) == 0 { - return nil + return } + t.loggedEventsMu.Lock() defer t.loggedEventsMu.Unlock() if t.loggedEvents >= int64(t.config.maxLoggedEvents.Load()) { - return nil + return + } + + t.stats.comparisionTime.RecordDuration()() + + differingEvents := t.differingEvents(events, pResponse, wResponse, eventsByMessageID) + if len(differingEvents) == 0 { + return } - logEntries := lo.Map(events, func(item types.SingularEventT, index int) string { + logEntries := lo.Map(differingEvents, func(item types.SingularEventT, index int) string { return stringify.Any(ptrans.TransformerEvent{ Message: item, Metadata: *metadata, }) }) if err := t.writeLogEntries(logEntries); err != nil { - return fmt.Errorf("logging events: %w", err) + t.logger.Warnn("Error logging events", obskit.Error(err)) + return } t.logger.Infon("Successfully logged events", logger.NewIntField("event_count", int64(len(logEntries)))) t.loggedEvents += int64(len(logEntries)) - return nil +} + +func (t *Transformer) differingEvents( + eventsToTransform []ptrans.TransformerEvent, + pResponse, wResponse ptrans.Response, + eventsByMessageID map[string]types.SingularEventWithReceivedAt, +) []types.SingularEventT { + // If the event counts differ, return all events in the transformation + if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) { + events := lo.Map(eventsToTransform, func(e ptrans.TransformerEvent, _ int) types.SingularEventT { + return eventsByMessageID[e.Metadata.MessageID].SingularEvent + }) + t.stats.mismatchedEvents.Count(len(events)) + return events + } + + var ( + differedSampleEvents []types.SingularEventT + differedEventsCount int + ) + + for i := range pResponse.Events { + if reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) { + continue + } + if differedEventsCount == 0 { + // Collect the mismatched messages and break (sample only) + differedSampleEvents = append(differedSampleEvents, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT { + return eventsByMessageID[msgID].SingularEvent + })...) + } + differedEventsCount++ + } + t.stats.mismatchedEvents.Count(differedEventsCount) + return differedSampleEvents } func (t *Transformer) writeLogEntries(entries []string) error { - writer, err := misc.CreateBufferedWriter(t.loggedFileName) + writer, err := misc.CreateGZ(t.loggedFileName) if err != nil { return fmt.Errorf("creating buffered writer: %w", err) } diff --git a/warehouse/transformer/safe.go b/warehouse/transformer/safe.go index b416bd72a0..679955a401 100644 --- a/warehouse/transformer/safe.go +++ b/warehouse/transformer/safe.go @@ -17,6 +17,10 @@ import ( whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) +const ( + postgresMaxIdentifierLength = 63 +) + var ( reLeadingUnderscores = regexp.MustCompile(`^_*`) reNonAlphanumericOrDollar = regexp.MustCompile(`[^a-zA-Z0-9\\$]`) @@ -142,7 +146,7 @@ func safeName(destType string, intrOpts *intrOptions, name string) string { case whutils.SNOWFLAKE, whutils.SnowpipeStreaming: name = strings.ToUpper(name) case whutils.POSTGRES: - name = misc.TruncateStr(name, 63) + name = misc.TruncateStr(name, postgresMaxIdentifierLength) name = strings.ToLower(name) default: name = strings.ToLower(name) @@ -245,7 +249,7 @@ func transformColumnName(destType string, intrOpts *intrOptions, destOpts *destO name = "_" + name } if destType == whutils.POSTGRES { - name = misc.TruncateStr(name, 63) + name = misc.TruncateStr(name, postgresMaxIdentifierLength) } return name } @@ -259,7 +263,7 @@ func startsWithDigit(name string) bool { // transformNameToBlendoCase converts the input string into Blendo case format by replacing non-alphanumeric characters with underscores. // If the name does not start with a letter or underscore, it adds a leading underscore. -// The name is truncated to 63 characters for Postgres, and the result is converted to lowercase. +// The name is truncated to postgresMaxIdentifierLength characters for Postgres, and the result is converted to lowercase. func transformNameToBlendoCase(destType, name string) string { key := reNonAlphanumericOrDollar.ReplaceAllString(name, "_") @@ -267,7 +271,7 @@ func transformNameToBlendoCase(destType, name string) string { key = "_" + key } if destType == whutils.POSTGRES { - key = misc.TruncateStr(name, 63) + key = misc.TruncateStr(name, postgresMaxIdentifierLength) } return strings.ToLower(key) } diff --git a/warehouse/transformer/transformer.go b/warehouse/transformer/transformer.go index e183a1daba..b745b1cca7 100644 --- a/warehouse/transformer/transformer.go +++ b/warehouse/transformer/transformer.go @@ -36,6 +36,9 @@ func New(conf *config.Config, logger logger.Logger, statsFactory stats.Stats) *T loggedFileName: generateLogFileName(), } + t.stats.mismatchedEvents = t.statsFactory.NewStat("warehouse_dest_transform_mismatched_events", stats.CountType) + t.stats.comparisionTime = t.statsFactory.NewStat("warehouse_dest_transform_comparison_time", stats.TimerType) + t.config.enableIDResolution = conf.GetReloadableBoolVar(false, "Warehouse.enableIDResolution") t.config.populateSrcDestInfoInContext = conf.GetReloadableBoolVar(true, "WH_POPULATE_SRC_DEST_INFO_IN_CONTEXT") t.config.maxColumnsInEvent = conf.GetReloadableIntVar(200, 1, "WH_MAX_COLUMNS_IN_EVENT") @@ -180,10 +183,7 @@ func (t *Transformer) getColumns( // uuid_ts and loaded_at datatypes are passed from here to create appropriate columns. // Corresponding values are inserted when loading into the warehouse - uuidTS := "uuid_ts" - if destType == whutils.SNOWFLAKE || destType == whutils.SnowpipeStreaming { - uuidTS = "UUID_TS" - } + uuidTS := whutils.ToProviderCase(destType, "uuid_ts") columns[uuidTS] = "datetime" if destType == whutils.BQ { diff --git a/warehouse/transformer/transformer_test.go b/warehouse/transformer/transformer_test.go index 4255f8ebb3..a6ce78432c 100644 --- a/warehouse/transformer/transformer_test.go +++ b/warehouse/transformer/transformer_test.go @@ -1,14 +1,21 @@ package transformer import ( + "compress/gzip" "fmt" + "io" "net/http" + "os" + "strconv" + "strings" "testing" "github.com/ory/dockertest/v3" "github.com/samber/lo" "github.com/stretchr/testify/require" + "github.com/rudderlabs/rudder-go-kit/stats/memstats" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" @@ -16,6 +23,7 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" ptrans "github.com/rudderlabs/rudder-server/processor/transformer" + "github.com/rudderlabs/rudder-server/utils/types" "github.com/rudderlabs/rudder-server/warehouse/transformer/internal/response" "github.com/rudderlabs/rudder-server/warehouse/transformer/testhelper" whutils "github.com/rudderlabs/rudder-server/warehouse/utils" @@ -617,6 +625,103 @@ func getTrackMetadata(destinationType, sourceCategory string) ptrans.Metadata { return metadata } +func TestTransformer_CompareAndLog(t *testing.T) { + tmpFile, err := os.CreateTemp("", "transformer_compare_log.*.txt") + require.NoError(t, err) + require.NoError(t, tmpFile.Close()) + + maxLoggedEvents := 10 + + c := config.New() + c.Set("Warehouse.maxLoggedEvents", maxLoggedEvents) + + statsStore, err := memstats.New() + require.NoError(t, err) + + trans := New(c, logger.NOP, statsStore) + trans.loggedFileName = tmpFile.Name() + + metadata := &ptrans.Metadata{ + SourceID: "sourceID", + DestinationID: "destinationID", + SourceType: "sourceType", + DestinationType: "destinationType", + } + + eventsByMessageID := make(map[string]types.SingularEventWithReceivedAt, 50) + for index := 0; index < 50; index++ { + eventsByMessageID[strconv.Itoa(index)] = types.SingularEventWithReceivedAt{ + SingularEvent: map[string]interface{}{ + "event": "track" + strconv.Itoa(index), + }, + } + } + + events := []ptrans.TransformerEvent{ + { + Message: types.SingularEventT{ + "event": "track", + "context": "context", + "properties": "properties", + }, + }, + } + + for i := 0; i < 1000; i++ { + pResponse := ptrans.Response{ + Events: lo.RepeatBy(50, func(index int) ptrans.TransformerResponse { + return ptrans.TransformerResponse{ + Output: types.SingularEventT{ + "event": "track" + strconv.Itoa(index+i), + }, + Metadata: ptrans.Metadata{ + MessageID: strconv.Itoa(index + i), + SourceID: "sourceID", + DestinationID: "destinationID", + SourceType: "sourceType", + DestinationType: "destinationType", + }, + } + }), + } + wResponse := ptrans.Response{ + Events: lo.RepeatBy(50, func(index int) ptrans.TransformerResponse { + return ptrans.TransformerResponse{ + Output: types.SingularEventT{ + "event": "track" + strconv.Itoa(index+i+1), + }, + Metadata: ptrans.Metadata{ + MessageID: strconv.Itoa(index + i + 1), + SourceID: "sourceID", + DestinationID: "destinationID", + SourceType: "sourceType", + DestinationType: "destinationType", + }, + } + }), + } + + trans.CompareAndLog(events, pResponse, wResponse, metadata, eventsByMessageID) + } + + f, err := os.OpenFile(tmpFile.Name(), os.O_RDWR, 0o644) + require.NoError(t, err) + gzipReader, err := gzip.NewReader(f) + require.NoError(t, err) + data, err := io.ReadAll(gzipReader) + require.NoError(t, err) + require.NoError(t, gzipReader.Close()) + require.NoError(t, f.Close()) + + differingEvents := strings.Split(strings.Trim(string(data), "\n"), "\n") + require.Len(t, differingEvents, maxLoggedEvents) + + for i := 0; i < maxLoggedEvents; i++ { + require.Contains(t, differingEvents[i], "track"+strconv.Itoa(i)) + } + require.EqualValues(t, maxLoggedEvents*50, statsStore.Get("warehouse_dest_transform_mismatched_events", stats.Tags{}).LastValue()) +} + func TestTransformer_GetColumns(t *testing.T) { testCases := []struct { name string @@ -697,8 +802,10 @@ func TestTransformer_GetColumns(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - trans := &Transformer{} - trans.config.maxColumnsInEvent = config.SingleValueLoader(int(tc.maxColumns)) + c := config.New() + c.Set("WH_MAX_COLUMNS_IN_EVENT", tc.maxColumns) + + trans := New(c, logger.NOP, stats.NOP) columns, err := trans.getColumns(tc.destType, tc.data, tc.columnTypes) if tc.wantError { diff --git a/warehouse/transformer/types.go b/warehouse/transformer/types.go index e121bfaa7a..465beeccbb 100644 --- a/warehouse/transformer/types.go +++ b/warehouse/transformer/types.go @@ -18,6 +18,10 @@ type ( logger logger.Logger statsFactory stats.Stats + stats struct { + comparisionTime stats.Timer + mismatchedEvents stats.Counter + } config struct { enableIDResolution config.ValueLoader[bool] populateSrcDestInfoInContext config.ValueLoader[bool]