Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add s3 batch consumer #43

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft

Add s3 batch consumer #43

wants to merge 4 commits into from

Conversation

cortze
Copy link
Contributor

@cortze cortze commented Jan 16, 2025

Description

Due to the large number of traces Hermes can generate, we've decided to include the option to submit the traces (batched) into a given S3 bucket. Furthermore, we've agreed (at least so far) to rely on parquet files to make the traces easier to import on the Data processing side.

This PR adds that functionality into Hermes, adding all the necessary checks and tests to ensure nothing is broken.

Tasks:

  • S3 configuration flags at cmd
  • S3 batcher datastream interface
  • Local test for the s3 batcher interfaces
  • Parquet formating support for traces
  • Integration of S3 tests with the localstack s3 docker image
  • (Optional) possibility of defining from a template which metrics do we want to trace within Hermes

NOTE: this PR is still WIP, as I'll need to check the performance of both: the parquet formater and the s3 submitter.

@dennis-tra dennis-tra self-requested a review January 17, 2025 07:53
host/s3.go Show resolved Hide resolved
Comment on lines +28 to +29
ctx context.Context
cancelFn context.CancelFunc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, let's avoid contexts on structs. From experience this was always possible with some restructuring. I think this is one of my more controversial opinions when it comes to writing Go code :D I just checked the rest of the code and the Kinesis Datastream also follows this pattern. So better be consistent instead of mixing patterns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I had the same dilemma in mind when I saw the AWS kinesis DataStream implementation, so I decided to keep it constant.

Happy to reorg the logic if you think it's a dealbreaker and it doesn't take much time to implement

host/s3.go Show resolved Hide resolved
}

func (s3ds *S3DataStream) OutputType() DataStreamOutputType {
return DataStreamOutputTypeKinesis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the naming might not be the clearest one, but there are only two types of Outputs:

  • The original Kinesis one
  • The extended one with more details for the Callback function (added by the EF team, and used at Xatu)

host/s3.go Show resolved Hide resolved
host/s3.go Show resolved Hide resolved
opCtx, cancel := context.WithTimeout(ctx, S3OpTimeout)
defer cancel()

s3ds.client.DeleteObject(opCtx, &s3.DeleteObjectInput{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this return an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah just for testing. Maybe still worth adding if at some point in the future someone just uses this method and expects to see an error?


}
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great to ensure proper clean up of this go-routine by signaling the S3DataStream that this go routine has exited.

I would say:

  • add a flusherDone chan struct{} to the S3DataStream struct and initialize it to an unbounded channel
  • in this method here call add defer close(s3ds.flusherDone) to the inner go routine
  • in the Stop() method wait for the flusherDone channel to be closed before exiting.
  • Just to be sure that the process always exits properly you could also do it with a timeout like
select {
case <-time.After(5*time.Second):
  // log that something didn't go as planed
case <-s3ds.flusherDone:
}

But if everything is setup correctly you wouldn't have to.

Lastly, in theory we then should make sure that spawnPeriodicFlusher is only called once during the lifecycle of the s3ds datastream. Otherwise the channel would be closed twice and we would panic. However, that also shouldn't happen - so I'd be fine not guarding for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is indeed a really nice addition, thus the ctx given to the spawnPeriodicFlusher wasn't the main one. Updating it

host/s3.go Show resolved Hide resolved
Comment on lines +403 to +411
func (b *traceBatcher) reset() []ParquetTraceEvent {
b.Lock()
prevTraces := make([]ParquetTraceEvent, len(b.traces))
for i, trace := range b.traces {
prevTraces[i] = *trace.toParquet()
}
b.traces = make([]*TraceEvent, 0)
b.Unlock()
return prevTraces
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the data copy I think (not sure) that you can do something like this:

func (b *traceBatcher) reset() []ParquetTraceEvent {
	b.Lock()
	prevTraces := b.traces
	b.traces = nil
	b.Unlock()
	return prevTraces

I'm really not sure about this.

@dennis-tra
Copy link
Contributor

Awesome that you have added all the tests!

require.NoError(t, err)

// wait 2,5 secs (flusher should kick in)
time.Sleep(2500 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid sleeps in tests. Is there another synchronization way?


// submit the traces
s3ds.submitRecords(ctx)
time.Sleep(300 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here. I'm happy to brainstorm how to add synchronization

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants