Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: generateTransformationMessage is its own concurrent step #5449

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 38 additions & 35 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1658,7 +1658,7 @@
dedupKeys map[string]struct{}
}

func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) {
func (proc *Handle) processJobsForDest(partition string, subJobs subJob) *preTransformationMessage {
if proc.limiter.preprocess != nil {
defer proc.limiter.preprocess.BeginWithPriority(partition, proc.getLimiterPriority(partition))()
}
Expand Down Expand Up @@ -1734,7 +1734,7 @@
for _, batchEvent := range jobList {
var eventParams types.EventParams
if err := jsonfast.Unmarshal(batchEvent.Parameters, &eventParams); err != nil {
return nil, err
panic(err)

Check warning on line 1737 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1737

Added line #L1737 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets return error from here, we can handle error at 1 place only in worker.go

}

var span stats.TraceSpan
Expand Down Expand Up @@ -1837,7 +1837,7 @@
if proc.config.enableDedup {
keyMap, err = proc.dedup.GetBatch(dedupKeysWithWorkspaceID)
if err != nil {
return nil, err
panic(err)

Check warning on line 1840 in processor/processor.go

View check run for this annotation

Codecov / codecov/patch

processor/processor.go#L1840

Added line #L1840 was not covered by tests
}
}
for _, event := range jobsWithMetaData {
Expand Down Expand Up @@ -1983,33 +1983,32 @@
panic(fmt.Errorf("len(statusList):%d != len(jobList):%d", len(statusList), len(jobList)))
}

return proc.generateTransformationMessage(
&preTransformationMessage{
partition: partition,
subJobs: subJobs,
eventSchemaJobs: eventSchemaJobs,
archivalJobs: archivalJobs,
connectionDetailsMap: connectionDetailsMap,
statusDetailsMap: statusDetailsMap,
reportMetrics: reportMetrics,
destFilterStatusDetailMap: destFilterStatusDetailMap,
inCountMetadataMap: inCountMetadataMap,
inCountMap: inCountMap,
outCountMap: outCountMap,
totalEvents: totalEvents,
marshalStart: marshalStart,
groupedEventsBySourceId: groupedEventsBySourceId,
eventsByMessageID: eventsByMessageID,
procErrorJobs: procErrorJobs,
jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly,
groupedEvents: groupedEvents,
uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey,
statusList: statusList,
jobList: jobList,
start: start,
sourceDupStats: sourceDupStats,
dedupKeys: dedupKeys,
})
return &preTransformationMessage{
partition: partition,
subJobs: subJobs,
eventSchemaJobs: eventSchemaJobs,
archivalJobs: archivalJobs,
connectionDetailsMap: connectionDetailsMap,
statusDetailsMap: statusDetailsMap,
reportMetrics: reportMetrics,
destFilterStatusDetailMap: destFilterStatusDetailMap,
inCountMetadataMap: inCountMetadataMap,
inCountMap: inCountMap,
outCountMap: outCountMap,
totalEvents: totalEvents,
marshalStart: marshalStart,
groupedEventsBySourceId: groupedEventsBySourceId,
eventsByMessageID: eventsByMessageID,
procErrorJobs: procErrorJobs,
jobIDToSpecificDestMapOnly: jobIDToSpecificDestMapOnly,
groupedEvents: groupedEvents,
uniqueMessageIdsBySrcDestKey: uniqueMessageIdsBySrcDestKey,
statusList: statusList,
jobList: jobList,
start: start,
sourceDupStats: sourceDupStats,
dedupKeys: dedupKeys,
}
}

func (proc *Handle) generateTransformationMessage(preTrans *preTransformationMessage) (*transformationMessage, error) {
Expand Down Expand Up @@ -3283,11 +3282,15 @@

var transMessage *transformationMessage
var err error
transMessage, err = proc.processJobsForDest(partition, subJob{
subJobs: unprocessedList.Jobs,
hasMore: false,
rsourcesStats: rsourcesStats,
})
transMessage, err = proc.generateTransformationMessage(proc.processJobsForDest(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR. but there's a possibility of duplicate persistence of jobs in events schema and archival?

partition,
subJob{
subJobs: unprocessedList.Jobs,
hasMore: false,
rsourcesStats: rsourcesStats,
},
),
)
if err != nil {
panic(err)
}
Expand Down
28 changes: 14 additions & 14 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,7 +964,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")

_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: []*jobsdb.JobT{
Expand Down Expand Up @@ -1011,7 +1011,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
},
},
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
for _, v := range c.MockObserver.calls {
Expand Down Expand Up @@ -1039,7 +1039,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")

_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: []*jobsdb.JobT{
Expand Down Expand Up @@ -1092,7 +1092,7 @@ var _ = Describe("Tracking Plan Validation", Ordered, func() {
},
},
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
for _, v := range c.MockObserver.calls {
Expand Down Expand Up @@ -1292,12 +1292,12 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")
_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: unprocessedJobsList,
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
})
Expand Down Expand Up @@ -1463,12 +1463,12 @@ var _ = Describe("Processor with event schemas v2", Ordered, func() {
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")
_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: unprocessedJobsList,
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
})
Expand Down Expand Up @@ -1648,12 +1648,12 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() {
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")
_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: unprocessedJobsList,
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
})
Expand Down Expand Up @@ -1805,12 +1805,12 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() {
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")
_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: unprocessedJobsList,
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
})
Expand Down Expand Up @@ -1956,12 +1956,12 @@ var _ = Describe("Processor with ArchivalV2 enabled", Ordered, func() {
defer cancel()
Expect(processor.config.asyncInit.WaitContext(ctx)).To(BeNil())
GinkgoT().Log("Processor setup and init done")
_, _ = processor.processJobsForDest(
_, _ = processor.generateTransformationMessage(processor.processJobsForDest(
"",
subJob{
subJobs: unprocessedJobsList,
},
)
))

Expect(c.MockObserver.calls).To(HaveLen(1))
})
Expand Down
25 changes: 18 additions & 7 deletions processor/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func newProcessorWorker(partition string, h workerHandle) *worker {
}
w.lifecycle.ctx, w.lifecycle.cancel = context.WithCancel(context.Background())
w.channel.preprocess = make(chan subJob, w.handle.config().pipelineBufferedItems)
w.channel.preTransform = make(chan *preTransformationMessage, w.handle.config().pipelineBufferedItems)
w.channel.transform = make(chan *transformationMessage, w.handle.config().pipelineBufferedItems)
w.channel.store = make(chan *storeMessage, (w.handle.config().pipelineBufferedItems+1)*(w.handle.config().maxEventsToProcess.Load()/w.handle.config().subJobSize+1))
w.start()
Expand All @@ -45,9 +46,10 @@ type worker struct {
wg sync.WaitGroup // worker wait group
}
channel struct { // worker channels
preprocess chan subJob // preprocess channel is used to send jobs to preprocess asynchronously when pipelining is enabled
transform chan *transformationMessage // transform channel is used to send jobs to transform asynchronously when pipelining is enabled
store chan *storeMessage // store channel is used to send jobs to store asynchronously when pipelining is enabled
preprocess chan subJob // preprocess channel is used to send jobs to preprocess asynchronously when pipelining is enabled
preTransform chan *preTransformationMessage // preTransform is used to send jobs to store to arc, esch and tracking plan validation
transform chan *transformationMessage // transform channel is used to send jobs to transform asynchronously when pipelining is enabled
store chan *storeMessage // store channel is used to send jobs to store asynchronously when pipelining is enabled
}
}

Expand All @@ -69,12 +71,21 @@ func (w *worker) start() {
w.lifecycle.wg.Add(1)
rruntime.Go(func() {
defer w.lifecycle.wg.Done()
defer close(w.channel.transform)
defer close(w.channel.preTransform)
defer w.logger.Debugf("preprocessing routine stopped for worker: %s", w.partition)
for jobs := range w.channel.preprocess {
var val *transformationMessage
var err error
val, err = w.handle.processJobsForDest(w.partition, jobs)
val := w.handle.processJobsForDest(w.partition, jobs)
w.channel.preTransform <- val
}
})

w.lifecycle.wg.Add(1)
rruntime.Go(func() {
defer w.lifecycle.wg.Done()
defer close(w.channel.transform)
defer w.logger.Debugf("pretransform routine stopped for worker: %s", w.partition)
for processedMessage := range w.channel.preTransform {
val, err := w.handle.generateTransformationMessage(processedMessage)
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion processor/worker_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type workerHandle interface {
getJobs(partition string) jobsdb.JobsResult
markExecuting(partition string, jobs []*jobsdb.JobT) error
jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsources.StatsCollector) []subJob
processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error)
processJobsForDest(partition string, subJobs subJob) *preTransformationMessage
generateTransformationMessage(preTrans *preTransformationMessage) (*transformationMessage, error)
transformations(partition string, in *transformationMessage) *storeMessage
Store(partition string, in *storeMessage)
}
Expand Down
15 changes: 11 additions & 4 deletions processor/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (m *mockWorkerHandle) handlePendingGatewayJobs(partition string) bool {
for _, subJob := range m.jobSplitter(jobs.Jobs, rsourcesStats) {
var dest *transformationMessage
var err error
dest, err = m.processJobsForDest(partition, subJob)
dest, err = m.generateTransformationMessage(m.processJobsForDest(partition, subJob))
if err != nil {
return false
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (m *mockWorkerHandle) jobSplitter(jobs []*jobsdb.JobT, rsourcesStats rsourc
}
}

func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) (*transformationMessage, error) {
func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob) *preTransformationMessage {
if m.limiters.process != nil {
defer m.limiters.process.Begin("")()
}
Expand All @@ -323,9 +323,16 @@ func (m *mockWorkerHandle) processJobsForDest(partition string, subJobs subJob)
m.partitionStats[partition] = s
m.log.Infof("processJobsForDest partition: %s stats: %+v", partition, s)

return &transformationMessage{
return &preTransformationMessage{
totalEvents: len(subJobs.subJobs),
hasMore: subJobs.hasMore,
subJobs: subJobs,
}
}

func (m *mockWorkerHandle) generateTransformationMessage(in *preTransformationMessage) (*transformationMessage, error) {
return &transformationMessage{
totalEvents: in.totalEvents,
hasMore: in.subJobs.hasMore,
trackedUsersReports: []*trackedusers.UsersReport{
{WorkspaceID: sampleWorkspaceID},
},
Expand Down
Loading