Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address multi-threading issues with AWS Lambda Plugin #5194

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

srikanthjg
Copy link
Contributor

Description

Address multi-threading issues with AWS Lambda Plugin

Issues Resolved

Resolves #4700

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Srikanth Govindarajan <[email protected]>
Signed-off-by: Srikanth Govindarajan <[email protected]>
@@ -17,6 +17,6 @@

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Target({ElementType.CONSTRUCTOR, ElementType.TYPE})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this.

@@ -26,5 +31,6 @@ public void handleEvents(List<Event> parsedEvents, List<Record<Event>> originalR
originalAcknowledgementSet.add(responseEvent);
}
}
LOG.info("Successfully handled {} events in Aggregate response strategy", parsedEvents.size());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be too many log statements

@@ -149,93 +155,136 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return records;
}

//lambda mutates event
List<Record<Event>> resultRecords = new ArrayList<>();
reentrantLock.lock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need this. This means multiple threads become serialized.

List<Event> parsedEvents = new ArrayList<>();
InputStream inputStream = PayloadValidator.validateAndGetInputStream(payload);
responseCodec.parse(inputStream, record -> parsedEvents.add(record.getData()));
LOG.info("Parsed successfully");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make debug/trace. This would result in too much logging. The metrics should indicate this.

resultRecords.add(originalRecords.get(i));
}
}catch (Exception e){
LOG.info("SRI ERRRRRRRRRROR",e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove try catch here. If it fails, we need to know. Catching is not correct.

if (whenCondition != null && !expressionEvaluator.evaluateConditional(whenCondition, event)) {
resultRecords.add(record);
continue;
}
try {
if (currentBufferPerBatch.getEventCount() == 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line and next few lines look same as processor code

@dlvenable
Copy link
Member

Please rebase this PR and force push so that it doesn't have any conflicts. This will help make it clearer what we are reviewing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants