Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable HttpPostRequestCallback to fail requests #124

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Added support for callbacks to mark requests as failed.

## [0.15.0] - 2024-07-30

### Added
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,18 @@ and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL proper
is provided.


- Callback Errors:

Throw a [FailedRequestException](src/main/java/com/getindata/connectors/http/FailedRequestException.java) to indicate a
failed request.

This allows control over the connector's behavior when an HTTP response does not meet your expectations
whether based on the response body or headers.

Currently, the only side effect is to incremenet the [numRecordsSendErrors counter](https://github.com/getindata/flink-http-connector?tab=readme-ov-file#http-sink-2), as the connector does not
support retries yet. However, once retry functionality is implemented, this will allow users to specify if requests should be retried.


## HTTP status code handler
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
By default all 400s and 500s response codes will be interpreted as error code.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.getindata.connectors.http;

/**
* Exception thrown from a {@link HttpPostRequestCallback} when a request should be considered as failed.
*
* <p>This exception is caught by the {@link com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient}
* and {@link com.getindata.connectors.http.internal.table.lookup.JavaNetHttpPollingClient}
*/
public class FailedRequestException extends Exception {
public FailedRequestException(String message) {
super(message);
}

public FailedRequestException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ void call(
RequestT requestEntry,
String endpointUrl,
Map<String, String> headerMap
);
) throws FailedRequestException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.flink.annotation.VisibleForTesting;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
Expand Down Expand Up @@ -98,13 +99,20 @@ private SinkHttpClientResponse prepareSinkHttpClientResponse(
for (var response : responses) {
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();

httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
var failedCallback = false;

try {
httpPostRequestCallback.call(
optResponse.orElse(null), sinkRequestEntry, endpointUrl, headerMap);
} catch (FailedRequestException e) {
failedCallback = true;
log.debug("FailedRequestException thrown by httpPostRequestCallback", e);
}

// TODO Add response processor here and orchestrate it with statusCodeChecker.
if (optResponse.isEmpty() ||
statusCodeChecker.isErrorCode(optResponse.get().statusCode())) {
statusCodeChecker.isErrorCode(optResponse.get().statusCode()) ||
failedCallback) {
failedResponses.add(sinkRequestEntry);
} else {
successfulResponses.add(sinkRequestEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.flink.util.StringUtils;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
Expand Down Expand Up @@ -89,7 +90,14 @@ private Optional<RowData> processHttpResponse(
HttpResponse<String> response,
HttpLookupSourceRequestEntry request) throws IOException {

this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());
try {
this.httpPostRequestCallback.call(
response, request, "endpoint", Collections.emptyMap()
);
} catch (FailedRequestException e) {
log.debug("FailedRequestException thrown by httpPostRequestCallback", e);
return Optional.empty();
Copy link
Contributor

@davidradl davidradl Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we not throw an Exception here? There is already precedence as this method already throws throws IOException.

Why is this not log.error - the text says Error?

Copy link
Author

@amstee amstee Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same reason as below, I don't think there's a need to bubble up the exception, just treat this request as failed.

}

if (response == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.io.File;
import java.net.http.HttpResponse;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import com.github.tomakehurst.wiremock.WireMockServer;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -19,6 +22,8 @@
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.FailedRequestException;
import com.getindata.connectors.http.internal.HttpsConnectionTestBase;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
Expand Down Expand Up @@ -62,6 +67,29 @@ public void testHttpConnection() {
batchRequestSubmitterFactory);
}

@Test
public void testHttpPostRequestCallbackWithFailedRequestException() throws ExecutionException, InterruptedException {
wireMockServer = new WireMockServer(SERVER_PORT);
wireMockServer.start();
mockEndPoint(wireMockServer);

JavaNetSinkHttpClient client =
new JavaNetSinkHttpClient(
properties,
new TestPostRequestCallbackWithException(),
headerPreprocessor,
perRequestSubmitterFactory);
HttpSinkRequestEntry requestEntry = new HttpSinkRequestEntry("GET", new byte[0]);
SinkHttpClientResponse response =
client.putRequests(
Collections.singletonList(requestEntry),
"https://localhost:" + HTTPS_SERVER_PORT + ENDPOINT
).get();

assertThat(response.getSuccessfulRequests()).isEmpty();
assertThat(response.getFailedRequests()).isNotEmpty();
}

@Test
public void testHttpsConnectionWithSelfSignedCert() {

Expand Down Expand Up @@ -366,4 +394,17 @@ private void mockEndPointWithBasicAuth(WireMockServer wireMockServer) {
.withBody("{}"))
);
}

public static class TestPostRequestCallbackWithException
implements HttpPostRequestCallback<HttpRequest> {
@Override
public void call(
HttpResponse<String> response,
HttpRequest requestEntry,
String endpointUrl,
Map<String, String> headerMap
) throws FailedRequestException {
throw new FailedRequestException("Test exception");
}
}
}
Loading