From eb3998cf26cf3b69ead7279e4ec5d4bf1215bd98 Mon Sep 17 00:00:00 2001 From: "siddarth.msv" Date: Tue, 21 Jan 2025 22:03:44 +0530 Subject: [PATCH] chore: generateTransformationMessage is its own concurrent step --- processor/processor.go | 73 +++++++++++++++++++------------------ processor/processor_test.go | 28 +++++++------- processor/worker.go | 25 +++++++++---- processor/worker_handle.go | 3 +- processor/worker_test.go | 15 ++++++-- 5 files changed, 83 insertions(+), 61 deletions(-) diff --git a/processor/processor.go b/processor/processor.go index 32032311d6..4f20e540ac 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -1658,7 +1658,7 @@ type preTransformationMessage struct { dedupKeys map[string]struct{} } -func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) { +func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *preTransformationMessage { if proc.limiter.preprocess != nil { defer proc.limiter.preprocess.BeginWithPriority(partition, proc.getLimiterPriority(partition))() } @@ -1734,7 +1734,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans for _, batchEvent := range jobList { var eventParams types.EventParams if err := jsonfast.Unmarshal(batchEvent.Parameters, &eventParams); err != nil { - return nil, err + panic(err) } var span stats.TraceSpan @@ -1837,7 +1837,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans if proc.config.enableDedup { keyMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID) if err != nil { - return nil, err + panic(err) } } for _, event := range jobsWithMetaData { @@ -1983,33 +1983,32 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList))) } - return proc.generateTransformationMessage( - &preTransformationMessage{ - partition: partition, - subJobs: subJobs, - eventSchemaJobs: eventSchemaJobs, - archivalJobs: archivalJobs, - connectionDetailsMap: connectionDetailsMap, - statusDetailsMap: statusDetailsMap, - reportMetrics: reportMetrics, - destFilterStatusDetailMap: destFilterStatusDetailMap, - inCountMetadataMap: inCountMetadataMap, - inCountMap: inCountMap, - outCountMap: outCountMap, - totalEvents: totalEvents, - marshalStart: marshalStart, - groupedEventsBySourceId: groupedEventsBySourceId, - eventsByMessageID: eventsByMessageID, - procErrorJobs: procErrorJobs, - jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly, - groupedEvents: groupedEvents, - uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey, - statusList: statusList, - jobList: jobList, - start: start, - sourceDupStats: sourceDupStats, - dedupKeys: dedupKeys, - }) + return &preTransformationMessage{ + partition: partition, + subJobs: subJobs, + eventSchemaJobs: eventSchemaJobs, + archivalJobs: archivalJobs, + connectionDetailsMap: connectionDetailsMap, + statusDetailsMap: statusDetailsMap, + reportMetrics: reportMetrics, + destFilterStatusDetailMap: destFilterStatusDetailMap, + inCountMetadataMap: inCountMetadataMap, + inCountMap: inCountMap, + outCountMap: outCountMap, + totalEvents: totalEvents, + marshalStart: marshalStart, + groupedEventsBySourceId: groupedEventsBySourceId, + eventsByMessageID: eventsByMessageID, + procErrorJobs: procErrorJobs, + jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly, + groupedEvents: groupedEvents, + uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey, + statusList: statusList, + jobList: jobList, + start: start, + sourceDupStats: sourceDupStats, + dedupKeys: dedupKeys, + } } func (proc *Handle) generateTransformationMessage(preTrans *preTransformationMessage) (*transformationMessage, error) { @@ -3283,11 +3282,15 @@ func (proc *Handle) handlePendingGatewayJobs(partition string) bool { var transMessage *transformationMessage var err error - transMessage, err = proc.processJobsForDest(partition, subJob{ - subJobs: unprocessedList.Jobs, - hasMore: false, - rsourcesStats: rsourcesStats, - }) + transMessage, err = proc.generateTransformationMessage(proc.processJobsForDest( + partition, + subJob{ + subJobs: unprocessedList.Jobs, + hasMore: false, + rsourcesStats: rsourcesStats, + }, + ), + ) if err != nil { panic(err) } diff --git a/processor/processor_test.go b/processor/processor_test.go index 851ea7ae35..eea6a0a8ae 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -964,7 +964,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() { Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: []*jobsdb.JobT{ @@ -1011,7 +1011,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() { }, }, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) for _, v := range c.MockObserver.calls { @@ -1039,7 +1039,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() { Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: []*jobsdb.JobT{ @@ -1092,7 +1092,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() { }, }, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) for _, v := range c.MockObserver.calls { @@ -1292,12 +1292,12 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: unprocessedJobsList, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) }) @@ -1463,12 +1463,12 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: unprocessedJobsList, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) }) @@ -1648,12 +1648,12 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: unprocessedJobsList, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) }) @@ -1805,12 +1805,12 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: unprocessedJobsList, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) }) @@ -1956,12 +1956,12 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() { defer cancel() Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil()) GinkgoT().Log("Processor setup and init done") - _, _ = processor.processJobsForDest( + _, _ = processor.generateTransformationMessage(processor.processJobsForDest( "", subJob{ subJobs: unprocessedJobsList, }, - ) + )) Expect(c.MockObserver.calls).To(HaveLen(1)) }) diff --git a/processor/worker.go b/processor/worker.go index 6b18dc284f..fc43ad75ca 100644 --- a/processor/worker.go +++ b/processor/worker.go @@ -21,6 +21,7 @@ func newProcessorWorker(partition string, h workerHandle) *worker { } w.lifecycle.ctx, w.lifecycle.cancel = context.WithCancel(context.Background()) w.channel.preprocess = make(chan subJob, w.handle.config().pipelineBufferedItems) + w.channel.preTransform = make(chan *preTransformationMessage, w.handle.config().pipelineBufferedItems) w.channel.transform = make(chan *transformationMessage, w.handle.config().pipelineBufferedItems) w.channel.store = make(chan *storeMessage, (w.handle.config().pipelineBufferedItems+1)*(w.handle.config().maxEventsToProcess.Load()/w.handle.config().subJobSize+1)) w.start() @@ -45,9 +46,10 @@ type worker struct { wg sync.WaitGroup // worker wait group } channel struct { // worker channels - preprocess chan subJob // preprocess channel is used to send jobs to preprocess asynchronously when pipelining is enabled - transform chan *transformationMessage // transform channel is used to send jobs to transform asynchronously when pipelining is enabled - store chan *storeMessage // store channel is used to send jobs to store asynchronously when pipelining is enabled + preprocess chan subJob // preprocess channel is used to send jobs to preprocess asynchronously when pipelining is enabled + preTransform chan *preTransformationMessage // preTransform is used to send jobs to store to arc, esch and tracking plan validation + transform chan *transformationMessage // transform channel is used to send jobs to transform asynchronously when pipelining is enabled + store chan *storeMessage // store channel is used to send jobs to store asynchronously when pipelining is enabled } } @@ -69,12 +71,21 @@ func (w *worker) start() { w.lifecycle.wg.Add(1) rruntime.Go(func() { defer w.lifecycle.wg.Done() - defer close(w.channel.transform) + defer close(w.channel.preTransform) defer w.logger.Debugf("preprocessing routine stopped for worker: %s", w.partition) for jobs := range w.channel.preprocess { - var val *transformationMessage - var err error - val, err = w.handle.processJobsForDest(w.partition, jobs) + val := w.handle.processJobsForDest(w.partition, jobs) + w.channel.preTransform <- val + } + }) + + w.lifecycle.wg.Add(1) + rruntime.Go(func() { + defer w.lifecycle.wg.Done() + defer close(w.channel.transform) + defer w.logger.Debugf("pretransform routine stopped for worker: %s", w.partition) + for processedMessage := range w.channel.preTransform { + val, err := w.handle.generateTransformationMessage(processedMessage) if err != nil { panic(err) } diff --git a/processor/worker_handle.go b/processor/worker_handle.go index a87574b703..34028aa9ad 100644 --- a/processor/worker_handle.go +++ b/processor/worker_handle.go @@ -23,7 +23,8 @@ type workerHandle interface { getJobs(partition string) jobsdb.JobsResult markExecuting(partition string, jobs []*jobsdb.JobT) error jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob - processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) + processJobsForDest(partition string, subJobs subJob) *preTransformationMessage + generateTransformationMessage(preTrans *preTransformationMessage) (*transformationMessage, error) transformations(partition string, in *transformationMessage) *storeMessage Store(partition string, in *storeMessage) } diff --git a/processor/worker_test.go b/processor/worker_test.go index 49c47aa73e..5823a0c70e 100644 --- a/processor/worker_test.go +++ b/processor/worker_test.go @@ -221,7 +221,7 @@ func (m *mockWorkerHandle) handlePendingGatewayJobs(partition string) bool { for _, subJob := range m.jobSplitter(jobs.Jobs, rsourcesStats) { var dest *transformationMessage var err error - dest, err = m.processJobsForDest(partition, subJob) + dest, err = m.generateTransformationMessage(m.processJobsForDest(partition, subJob)) if err != nil { return false } @@ -311,7 +311,7 @@ func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsourc } } -func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) { +func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) *preTransformationMessage { if m.limiters.process != nil { defer m.limiters.process.Begin("")() } @@ -323,9 +323,16 @@ func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) m.partitionStats[partition] = s m.log.Infof("processJobsForDest partition: %s stats: %+v", partition, s) - return &transformationMessage{ + return &preTransformationMessage{ totalEvents: len(subJobs.subJobs), - hasMore: subJobs.hasMore, + subJobs: subJobs, + } +} + +func (m *mockWorkerHandle) generateTransformationMessage(in *preTransformationMessage) (*transformationMessage, error) { + return &transformationMessage{ + totalEvents: in.totalEvents, + hasMore: in.subJobs.hasMore, trackedUsersReports: []*trackedusers.UsersReport{ {WorkspaceID: sampleWorkspaceID}, },