Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zero Buffer Implementation and Tests #5416

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

MohammedAghil
Copy link

@MohammedAghil MohammedAghil commented Feb 5, 2025

Description

Added Zero Buffer Plugin Implementation and Tests.

  • PipelineRunner: Defined an interface for executing all processors and publishing to sinks. The functionality for this will be implemented in a future commit.
  • AbstractZeroBuffer: Added base class for zero buffer with the pipeline runner context
  • ZeroBuffer: Concrete implementation of AbstractZeroBuffer
  • ZeroBufferTests: Test cases for the ZeroBuffer to ensure correct functionality.
  • Added Zerobuffer as a Dataprepper plugin

Issues Resolved

Resolves [#5415]

Check List

  • New functionality includes testing.
  • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @MohammedAghil . This is an exciting new feature!

Metrics.addRegistry(new SimpleMeterRegistry());
}

/*-------------------------Tests for Writing to ZeroBuffer---------------------------*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these comments will remain valid as we modify this code.

One approach you can use is to create JUnit @Nested tests. These will even report tests in groups. And you may even find opportunities to create common setup.

Here is an example:


dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-core')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should depend on data-prepper-core here.

I think it may make the most sense to move this project (zero-buffer) into data-prepper-core.


@DataPrepperPluginConstructor
public ZeroBuffer(PipelineDescription pipelineDescription) {
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can now get this from dependency injection. @san81 , is that correct?

readRecordsCounter.increment(storedRecords.size() * 1.0);
}

return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Map.entry(storedRecords, checkpointState) instead.

if (storedRecords!= null && !storedRecords.isEmpty()) {
checkpointState = new CheckpointState(storedRecords.size());
threadLocalStore.remove();
readRecordsCounter.increment(storedRecords.size() * 1.0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why * 1.0? If it is make a double, may just cast.

* Represents the base class for zero buffer implementation and implements {@link Buffer} interface.
* It provides a common functionality to run all processors and sinks within the same thread context.
*/
public abstract class AbstractZeroBuffer <T extends Record<?>> implements Buffer<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than an abstract class, make this an interface. Maybe something like SupportsPipelineRunner.

interface SupportsPipelineRunner {
  PipelineRunner getPipelineRunner();
  void setPipelineRunner(PipelineRunner pipelineRunner);
}

Then move the implementation into ZeroBuffer.

import java.util.Map;
import java.util.concurrent.TimeoutException;

@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be named zero? We have moved away from including the type suffix. For example, we went from otel_trace_source to otel_trace.

}

if (threadLocalStore.get() == null) {
threadLocalStore.set(records);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you are possibly reusing the thread local store, you actually are modifying this input list. You should make a copy of this. new ArrayList<>(records).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants