Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pub/Sub to Elasticsearch] Fix error handling #319

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

Cherepushko
Copy link
Contributor

This pull request contains many changes in custom ElasticsearchIO to be able to handle failed inserts. Now ElasticsearchIO.Write is not a terminal step but returns JSON with error message to be sent to error-output-topic.

Now failed inserts don't slow down the pipeline and the throughput remain stable.

It's a draft PR and additional refactoring can be required.

@Cherepushko
Copy link
Contributor Author

@prathapreddy123 @suyograo

@suyograo
Copy link

suyograo commented Feb 8, 2022

@Cherepushko can you rebase please? Also, can you add in the description and code comments we are removing invalid fields (with only dot as name) so Elasticsearch doesn't throw exception and slow the throughput considerably.

@suyograo
Copy link

suyograo commented Feb 8, 2022

Also left a comment for adding unit test for the method that removes dots from k8s logs

@suyograo
Copy link

LGTM

@@ -27,6 +27,7 @@
</parent>

<artifactId>elasticsearch-common</artifactId>
<version>1.0-SNAPSHOT</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file needs to be included as current version in master branch is already 1.0-SNAPSHOT

@@ -69,7 +69,7 @@ public static Builder newBuilder() {
private static final String DOCUMENT_TYPE = "_doc";

@Override
public PDone expand(PCollection<String> jsonStrings) {
public PCollection<String> expand(PCollection<String> jsonStrings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this function uses ElasticsearchIO.write() which still returns PDone can you explain how can we get PCollection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get PCollection in modified ElasticsearchIO

@@ -19,6 +19,10 @@ To enable the out of the box integration:
1. Install the Google Cloud [Logs integration](https://www.elastic.co/integrations?solution=observability&category=google-cloud) from Kibana UI
2. [Export logs](https://cloud.google.com/logging/docs/export) from above data source to a separate Pub/Sub subscription. You can then use the Dataflow template's `dataset` parameter to specify the data source name and their corresponding `inputSubscription`.

## Data validation

Invalid input fields in messages will be fixed in runtime to not slowdown the pipeline and to prevent throwing exceptions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some examples that were being taken care since code might not be handling all invalid fields

@@ -1,5 +1,5 @@
/*
* Copyright (C) 1260 Google LLC
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since templates already uses 2.36 version should we directly consider using Beam ElasticSearch IO and drop the teleport version. However this needs proper validation of all 3 existing templates

<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.4.0</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you upgrade to most recent version (2.6 or 2.7)

try {
outputMessage = OBJECT_MAPPER.readTree(input);
} catch (JsonProcessingException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid using printStackTrace instead log the warning message using Log.warn

String input = context.element();
ObjectNode node = null;
try {
JsonPath.using(configuration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the use of this code as neither output was capture nor an exception was raised in case of any error

} catch (Exception ignored) {
}

JsonNode requestApp = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is a lot of repeated code, add functions shown below and call the functions for different jpaths (i.e requestAPP, responseApp etc)

  1. parseJson(String json, String jpath) to handle read functionality
  2. updateJson(String json, ... other parameters) to handle put functionality
  3. deleteJsonString json, String jpath) to handle delete functionality

context.output(node.toString());
}

public static ObjectNode removeEmptyFields(final ObjectNode jsonNode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Java doc comments to all the methods

import org.slf4j.LoggerFactory;

/** ProcessValidateJsonFields is used to fix malformed fields, (ex. de-dot). */
public class ProcessValidateJsonFields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better name would be ResolveInvalidJsonFields

@stale
Copy link

stale bot commented Apr 18, 2022

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale label Apr 18, 2022
@bvolpato
Copy link
Contributor

Thanks for contributing!

There have been code reviews here that were not addressed (and the same thing about the structure of the project changing) -- do we still want to work on this PR?

@cuigle
Copy link

cuigle commented Aug 29, 2023

Hi team, I'm a Google Cloud TAM that has a customer that needs this issue to be addressed. They have 30+ projects that sends logs to ElasticSearch and this error constantly appears. They will need the Pub/Sub to Elasticsearch template to catch such errors and make necessary formatting changes so that all logs from Google Cloud are accepted on ElasticSearch side. Could you please reopen this issue?

@vidok
Copy link
Contributor

vidok commented Mar 26, 2024

Hi team, I'm a Google Cloud TAM that has a customer that needs this issue to be addressed. They have 30+ projects that sends logs to ElasticSearch and this error constantly appears. They will need the Pub/Sub to Elasticsearch template to catch such errors and make necessary formatting changes so that all logs from Google Cloud are accepted on ElasticSearch side. Could you please reopen this issue?

Hey @cuigle! It's been a while but could you let me know if your customer still needs this improvement? Our team at Elastic consider maintaining this template and we can pick up this issue. It would be helpful if you share more detailed information about the issues your customer 's been facing.

Thanks.

@github-actions github-actions bot removed the stale label Jul 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants