-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Zero Buffer Implementation and Tests #5416
base: main
Are you sure you want to change the base?
Zero Buffer Implementation and Tests #5416
Conversation
Signed-off-by: Mohammed Aghil Puthiyottil <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @MohammedAghil . This is an exciting new feature!
Metrics.addRegistry(new SimpleMeterRegistry()); | ||
} | ||
|
||
/*-------------------------Tests for Writing to ZeroBuffer---------------------------*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these comments will remain valid as we modify this code.
One approach you can use is to create JUnit @Nested
tests. These will even report tests in groups. And you may even find opportunities to create common setup.
Here is an example:
Lines 294 to 295 in 7d15115
@Nested | |
class EqualsAndHashCodeAndToString { |
|
||
dependencies { | ||
implementation project(':data-prepper-api') | ||
implementation project(':data-prepper-core') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should depend on data-prepper-core here.
I think it may make the most sense to move this project (zero-buffer
) into data-prepper-core.
|
||
@DataPrepperPluginConstructor | ||
public ZeroBuffer(PipelineDescription pipelineDescription) { | ||
this.pluginMetrics = PluginMetrics.fromNames(PLUGIN_COMPONENT_ID, pipelineDescription.getPipelineName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can now get this from dependency injection. @san81 , is that correct?
readRecordsCounter.increment(storedRecords.size() * 1.0); | ||
} | ||
|
||
return new AbstractMap.SimpleEntry<>(storedRecords, checkpointState); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Map.entry(storedRecords, checkpointState)
instead.
if (storedRecords!= null && !storedRecords.isEmpty()) { | ||
checkpointState = new CheckpointState(storedRecords.size()); | ||
threadLocalStore.remove(); | ||
readRecordsCounter.increment(storedRecords.size() * 1.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why * 1.0
? If it is make a double, may just cast.
* Represents the base class for zero buffer implementation and implements {@link Buffer} interface. | ||
* It provides a common functionality to run all processors and sinks within the same thread context. | ||
*/ | ||
public abstract class AbstractZeroBuffer <T extends Record<?>> implements Buffer<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than an abstract class, make this an interface. Maybe something like SupportsPipelineRunner
.
interface SupportsPipelineRunner {
PipelineRunner getPipelineRunner();
void setPipelineRunner(PipelineRunner pipelineRunner);
}
Then move the implementation into ZeroBuffer
.
import java.util.Map; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
@DataPrepperPlugin(name = "zero_buffer", pluginType = Buffer.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this just be named zero
? We have moved away from including the type suffix. For example, we went from otel_trace_source
to otel_trace
.
} | ||
|
||
if (threadLocalStore.get() == null) { | ||
threadLocalStore.set(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you are possibly reusing the thread local store, you actually are modifying this input list. You should make a copy of this. new ArrayList<>(records)
.
Description
Added Zero Buffer Plugin Implementation and Tests.
Issues Resolved
Resolves [#5415]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.