-
Notifications
You must be signed in to change notification settings - Fork 213
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add stateful buffer for lambda sink #5354
base: main
Are you sure you want to change the base?
Add stateful buffer for lambda sink #5354
Conversation
Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
|
||
// The partial buffer that may not yet have reached threshold. | ||
// Access must be synchronized | ||
private Buffer statefulBuffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should handle concurrent actions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, all i have used "synchronized" on the methods where this is used. In s3 sink, they use reentrant lock to handle something similar.
Signed-off-by: Srikanth Govindarajan <[email protected]>
for (Buffer buf : buffersToFlush) { | ||
combinedRecords.addAll(buf.getRecords()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be not clear on the overall approach here but Why are we combining the buffers here? also, may be not a big deal but we are creating additional copy of the payload here.
|
||
verify(numberOfRecordsFailedCounter, times(1)).increment(1); | ||
// Utility to set private fields | ||
private static void setPrivateField(Object target, String fieldName, Object value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a data-prepper-test-common
module that has these kind of common reusable methods used for test cases. We can reuse these utilities from there and avoid adding these methods in every plugin.
Signed-off-by: Srikanth Govindarajan <[email protected]>
Description
Add Stateful Buffering to LambdaSink
Issues Resolved
Resolves #5353
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.