Skip to content

Commit

Permalink
Added HTTP error detail provider and refactored HTTP-sink and source …
Browse files Browse the repository at this point in the history
…package to handle error provider and fix sonar issues
  • Loading branch information
Amit-CloudSufi committed Dec 9, 2024
1 parent 5a3f8ab commit 5712df2
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 237 deletions.
120 changes: 120 additions & 0 deletions src/main/java/io/cdap/plugin/http/common/HttpErrorDetailsProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.http.common;

import com.google.common.base.Throwables;
import io.cdap.cdap.api.exception.ErrorCategory;
import io.cdap.cdap.api.exception.ErrorType;
import io.cdap.cdap.api.exception.ErrorUtils;
import io.cdap.cdap.api.exception.ProgramFailureException;
import io.cdap.cdap.etl.api.exception.ErrorContext;
import io.cdap.cdap.etl.api.exception.ErrorDetailsProvider;
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;

import java.util.List;

import java.util.NoSuchElementException;

/**
* Error details provided for the HTTP
**/
public class HttpErrorDetailsProvider implements ErrorDetailsProvider {
@Override
public ProgramFailureException getExceptionDetails(Exception e, ErrorContext errorContext) {
List<Throwable> causalChain = Throwables.getCausalChain(e);
for (Throwable t : causalChain) {
if (t instanceof ProgramFailureException) {
// if causal chain already has program failure exception, return null to avoid double wrap.
return null;
}
if (t instanceof IllegalArgumentException) {
return getProgramFailureException((IllegalArgumentException) t, errorContext);
}
if (t instanceof IllegalStateException) {
return getProgramFailureException((IllegalStateException) t, errorContext);
}
if (t instanceof InvalidConfigPropertyException) {
return getProgramFailureException((InvalidConfigPropertyException) t, errorContext);
}
if (t instanceof NoSuchElementException) {
return getProgramFailureException((NoSuchElementException) t, errorContext);
}
}
return null;
}

/**
* Get a ProgramFailureException with the given error
* information from {@link IllegalArgumentException}.
*
* @param e The IllegalArgumentException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalArgumentException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.USER, false, e);
}

/**
* Get a ProgramFailureException with the given error
* information from {@link IllegalStateException}.
*
* @param e The IllegalStateException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(IllegalStateException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
}

/**
* Get a ProgramFailureException with the given error
* information from {@link InvalidConfigPropertyException}.
*
* @param e The InvalidConfigPropertyException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(InvalidConfigPropertyException e,
ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
}

/**
* Get a ProgramFailureException with the given error
* information from {@link NoSuchElementException}.
*
* @param e The NoSuchElementException to get the error information from.
* @return A ProgramFailureException with the given error information.
*/
private ProgramFailureException getProgramFailureException(NoSuchElementException e, ErrorContext errorContext) {
String errorMessage = e.getMessage();
String errorMessageFormat = "Error occurred in the phase: '%s'. Error message: %s";
return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
errorMessage,
String.format(errorMessageFormat, errorContext.getPhase(), errorMessage), ErrorType.SYSTEM, false, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ public class HTTPOutputFormat extends OutputFormat<StructuredRecord, StructuredR
static final String INPUT_SCHEMA_KEY = "http.sink.input.schema";

@Override
public RecordWriter<StructuredRecord, StructuredRecord> getRecordWriter(TaskAttemptContext context)
throws IOException {
public RecordWriter<StructuredRecord, StructuredRecord> getRecordWriter(TaskAttemptContext context) {
Configuration hConf = context.getConfiguration();
HTTPSinkConfig config = GSON.fromJson(hConf.get(CONFIG_KEY), HTTPSinkConfig.class);
Schema inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY));
return new HTTPRecordWriter(config, inputSchema);
Schema inputSchema;
try {
inputSchema = Schema.parseJson(hConf.get(INPUT_SCHEMA_KEY));
return new HTTPRecordWriter(config, inputSchema);
} catch (IOException e) {
throw new IllegalStateException("Unable to parse the input schema. Reason: " + e.getMessage(), e);
}
}

@Override
Expand Down
Loading

0 comments on commit 5712df2

Please sign in to comment.