Skip to content

Commit

Permalink
Feat.warehouse transformer fuzz (#5206)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored and achettyiitr committed Oct 21, 2024
1 parent 98f7827 commit 9f36c96
Show file tree
Hide file tree
Showing 36 changed files with 5,254 additions and 7,735 deletions.
73 changes: 69 additions & 4 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"runtime/trace"
"slices"
"strconv"
"strings"
"sync"
"time"

obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

"github.com/rudderlabs/rudder-server/enterprise/trackedusers"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -55,6 +58,7 @@ import (
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
wtrans "github.com/rudderlabs/rudder-server/warehouse/transformer"
)

const (
Expand Down Expand Up @@ -84,10 +88,12 @@ type trackedUsersReporter interface {

// Handle is a handle to the processor module
type Handle struct {
conf *config.Config
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
conf *config.Config
tracer stats.Tracer
backendConfig backendconfig.BackendConfig
transformer transformer.Transformer
warehouseTransformer transformer.DestinationTransformer
warehouseDebugLogger *wtrans.DebugLogger

gatewayDB jobsdb.JobsDB
routerDB jobsdb.JobsDB
Expand Down Expand Up @@ -156,6 +162,7 @@ type Handle struct {
eventAuditEnabled map[string]bool
credentialsMap map[string][]transformer.Credential
nonEventStreamSources map[string]bool
enableWarehouseTransformations config.ValueLoader[bool]
}

drainConfig struct {
Expand Down Expand Up @@ -615,6 +622,9 @@ func (proc *Handle) Setup(
"partition": partition,
})
}
proc.warehouseTransformer = wtrans.New(proc.conf, proc.logger, proc.statsFactory)
proc.warehouseDebugLogger = wtrans.NewDebugLogger(proc.conf, proc.logger)

if proc.config.enableDedup {
var err error
proc.dedup, err = dedup.New(proc.conf, proc.statsFactory)
Expand Down Expand Up @@ -815,6 +825,7 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv
proc.config.archivalEnabled = config.GetReloadableBoolVar(true, "archival.Enabled")
// Capture event name as a tag in event level stats
proc.config.captureEventNameStats = config.GetReloadableBoolVar(false, "Processor.Stats.captureEventName")
proc.config.enableWarehouseTransformations = config.GetReloadableBoolVar(false, "Processor.enableWarehouseTransformations")
}

type connection struct {
Expand Down Expand Up @@ -2765,6 +2776,7 @@ func (proc *Handle) transformSrcDest(
proc.logger.Debug("Dest Transform input size", len(eventsToTransform))
s := time.Now()
response = proc.transformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
proc.handleResponseForWarehouseTransformation(ctx, eventsToTransform, response, commonMetaData, eventsByMessageID)

destTransformationStat := proc.newDestinationTransformationStat(sourceID, workspaceID, transformAt, destination)
destTransformationStat.transformTime.Since(s)
Expand Down Expand Up @@ -2923,6 +2935,59 @@ func (proc *Handle) transformSrcDest(
}
}

func (proc *Handle) handleResponseForWarehouseTransformation(
ctx context.Context,
eventsToTransform []transformer.TransformerEvent,
pResponse transformer.Response,
commonMetaData *transformer.Metadata,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) {
if len(eventsToTransform) == 0 || !proc.config.enableWarehouseTransformations.Load() {
return
}
defer proc.statsFactory.NewStat("proc_warehouse_transformations_time", stats.TimerType).RecordDuration()()

wResponse := proc.warehouseTransformer.Transform(ctx, eventsToTransform, proc.config.transformBatchSize.Load())
responsesDiffer := proc.responsesDiffer(eventsToTransform, pResponse, wResponse, eventsByMessageID)
if len(responsesDiffer) == 0 {
return
}
if err := proc.warehouseDebugLogger.LogEvents(responsesDiffer, commonMetaData); err != nil {
proc.logger.Warnn("Failed to log events for warehouse transformation debugging", obskit.Error(err))
}
}

func (proc *Handle) responsesDiffer(
eventsToTransform []transformer.TransformerEvent,
pResponse, wResponse transformer.Response,
eventsByMessageID map[string]types.SingularEventWithReceivedAt,
) (messages []types.SingularEventT) {
if len(pResponse.Events) != len(wResponse.Events) || len(pResponse.FailedEvents) != len(wResponse.FailedEvents) {
return lo.Map(eventsToTransform, func(e transformer.TransformerEvent, _ int) types.SingularEventT {
return eventsByMessageID[e.Metadata.MessageID].SingularEvent
})
}
for i := range pResponse.Events {
if reflect.DeepEqual(pResponse.Events[i], wResponse.Events[i]) {
continue
}
messages = append(messages, lo.Map(pResponse.Events[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
}
for i := range pResponse.FailedEvents {
wResponse.FailedEvents[i].Error = pResponse.FailedEvents[i].Error

if reflect.DeepEqual(pResponse.FailedEvents[i], wResponse.FailedEvents[i]) {
continue
}
messages = append(messages, lo.Map(pResponse.FailedEvents[i].Metadata.GetMessagesIDs(), func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
})...)
}
return
}

func (proc *Handle) saveDroppedJobs(ctx context.Context, droppedJobs []*jobsdb.JobT, tx *Tx) error {
if len(droppedJobs) > 0 {
for i := range droppedJobs { // each dropped job should have a unique jobID in the scope of the batch
Expand Down
Loading

0 comments on commit 9f36c96

Please sign in to comment.