Skip to content

Commit

Permalink
Merge pull request #171 from cloudsufi/feat/http-sink-patch-support
Browse files Browse the repository at this point in the history
[PLUGIN-1810] Add support for HTTP PATCH
  • Loading branch information
psainics authored Nov 5, 2024
2 parents b0019d4 + c7ef6c7 commit 341cb03
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 13 deletions.
38 changes: 33 additions & 5 deletions src/main/java/io/cdap/plugin/http/sink/batch/HTTPRecordWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.http.common.RetryPolicy;
import io.cdap.plugin.http.common.error.ErrorHandling;
import io.cdap.plugin.http.common.error.HttpErrorHandler;
import io.cdap.plugin.http.common.error.RetryableErrorHandling;
import io.cdap.plugin.http.common.http.HttpRequest;
import io.cdap.plugin.http.common.http.HttpResponse;
import io.cdap.plugin.http.common.http.OAuthUtil;

import org.apache.hadoop.mapreduce.RecordWriter;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredR
public static final String REQUEST_METHOD_POST = "POST";
public static final String REQUEST_METHOD_PUT = "PUT";
public static final String REQUEST_METHOD_DELETE = "DELETE";
public static final String REQUEST_METHOD_PATCH = "PATCH";

private final HTTPSinkConfig config;
private final MessageBuffer messageBuffer;
Expand All @@ -96,6 +99,7 @@ public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredR
private final HttpErrorHandler httpErrorHandler;
private final PollInterval pollInterval;
private int httpStatusCode;
private String httpResponseBody;
private static int retryCount;

HTTPRecordWriter(HTTPSinkConfig config, Schema inputSchema) {
Expand All @@ -120,11 +124,13 @@ public class HTTPRecordWriter extends RecordWriter<StructuredRecord, StructuredR
@Override
public void write(StructuredRecord input, StructuredRecord unused) throws IOException {
configURL = url;
if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT)) {
if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT) ||
config.getMethod().equals(REQUEST_METHOD_PATCH)) {
messageBuffer.add(input);
}

if (config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_DELETE)
if (config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH) ||
config.getMethod().equals(REQUEST_METHOD_DELETE)
&& !placeHolderList.isEmpty()) {
configURL = updateURLWithPlaceholderValue(input);
}
Expand Down Expand Up @@ -200,9 +206,9 @@ private boolean executeHTTPServiceAndCheckStatusCode() throws IOException {
request.setHeaders(getRequestHeaders());

response = httpClient.execute(request);

httpStatusCode = response.getStatusLine().getStatusCode();
LOG.debug("Response HTTP Status code: {}", httpStatusCode);
httpResponseBody = new HttpResponse(response).getBody();

} catch (MalformedURLException | ProtocolException e) {
throw new IllegalStateException("Error opening url connection. Reason: " + e.getMessage(), e);
Expand Down Expand Up @@ -277,7 +283,9 @@ private Header[] getRequestHeaders() throws IOException {
headers.put("Instance-Follow-Redirects", String.valueOf(config.getFollowRedirects()));
headers.put("charset", config.getCharset());

if (config.getMethod().equals(REQUEST_METHOD_POST) || config.getMethod().equals(REQUEST_METHOD_PUT)) {
if (config.getMethod().equals(REQUEST_METHOD_POST)
|| config.getMethod().equals(REQUEST_METHOD_PATCH)
|| config.getMethod().equals(REQUEST_METHOD_PUT)) {
if (!headers.containsKey("Content-Type")) {
headers.put("Content-Type", contentType);
}
Expand All @@ -302,7 +310,8 @@ private Header getAuthorizationHeader(AccessToken accessToken) {
*/
private List<PlaceholderBean> getPlaceholderListFromURL() {
List<PlaceholderBean> placeholderList = new ArrayList<>();
if (!(config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_DELETE))) {
if (!(config.getMethod().equals(REQUEST_METHOD_PUT) || config.getMethod().equals(REQUEST_METHOD_PATCH) ||
config.getMethod().equals(REQUEST_METHOD_DELETE))) {
return placeholderList;
}
Pattern pattern = Pattern.compile(REGEX_HASHED_VAR);
Expand Down Expand Up @@ -351,6 +360,25 @@ private void flushMessageBuffer() {
"after the batch execution. " + e);
}
messageBuffer.clear();

ErrorHandling postRetryStrategy = httpErrorHandler.getErrorHandlingStrategy(httpStatusCode)
.getAfterRetryStrategy();

switch (postRetryStrategy) {
case SUCCESS:
break;
case STOP:
throw new IllegalStateException(String.format("Fetching from url '%s' returned status code '%d' and body '%s'",
config.getUrl(), httpStatusCode, httpResponseBody));
case SKIP:
case SEND:
LOG.warn(String.format("Fetching from url '%s' returned status code '%d' and body '%s'",
config.getUrl(), httpStatusCode, httpResponseBody));
break;
default:
throw new IllegalArgumentException(String.format("Unexpected http error handling: '%s'", postRetryStrategy));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class HTTPSinkConfig extends BaseHttpConfig {
private static final String REGEX_HASHED_VAR = "#(\\w+)";
private static final String PLACEHOLDER = "#";
private static final Set<String> METHODS = ImmutableSet.of(HttpMethod.GET, HttpMethod.POST,
HttpMethod.PUT, HttpMethod.DELETE);
HttpMethod.PUT, HttpMethod.DELETE, "PATCH");

@Name(URL)
@Description("The URL to post data to. Additionally, a placeholder like #columnName can be added to the URL that " +
Expand Down Expand Up @@ -462,7 +462,6 @@ public void validate(FailureCollector collector) {
if (!containsMacro(PROPERTY_RETRY_POLICY) && getRetryPolicy() == RetryPolicy.LINEAR) {
assertIsSet(getLinearRetryInterval(), PROPERTY_LINEAR_RETRY_INTERVAL, "retry policy is linear");
}

if (!containsMacro(READ_TIMEOUT) && Objects.nonNull(readTimeout) && readTimeout < 0) {
collector.addFailure("Read Timeout cannot be a negative number.", null)
.withConfigProperty(READ_TIMEOUT);
Expand Down Expand Up @@ -494,7 +493,7 @@ public void validateSchema(@Nullable Schema schema, FailureCollector collector)
return;
}

if ((method.equals("PUT") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) {
if ((method.equals("PUT") || method.equals("PATCH") || method.equals("DELETE")) && url.contains(PLACEHOLDER)) {
Pattern pattern = Pattern.compile(REGEX_HASHED_VAR);
Matcher matcher = pattern.matcher(url);
List<String> fieldNames = fields.stream().map(field -> field.getName()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.cdap.plugin.http.common.pagination.page.PageFormat;
import io.cdap.plugin.http.source.common.DelimitedSchemaDetector;
import io.cdap.plugin.http.source.common.RawStringPerLine;
import org.apache.http.client.methods.CloseableHttpResponse;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -68,8 +69,14 @@ public Schema getSchema(FailureCollector failureCollector) {
case TSV:
String delimiter = format == PageFormat.CSV ? "," : "\t";
try (HttpClient client = new HttpClient(config)) {
RawStringPerLine rawStringPerLine = new RawStringPerLine(
new HttpResponse(client.executeHTTP(config.getUrl())));
CloseableHttpResponse closeableHttpResponse = client.executeHTTP(config.getUrl());
int statusCode = closeableHttpResponse.getStatusLine().getStatusCode();
if (statusCode < 200 || statusCode >= 300) {
failureCollector.addFailure(String.format("Failed to read the file, non 2xx status code received: %s",
statusCode), null);
return null;
}
RawStringPerLine rawStringPerLine = new RawStringPerLine(new HttpResponse(closeableHttpResponse));
return DelimitedSchemaDetector.detectSchema(config, delimiter, rawStringPerLine, failureCollector);
} catch (IOException e) {
String errorMessage = e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,18 @@ public void testValidInputSchema() {
Assert.assertTrue(collector.getValidationFailures().isEmpty());
}

@Test()
public void testValidInputWithPlaceHoldersWithPATCH() {
Schema schema = Schema.recordOf("record",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)));
String dynamicUrl = "http://example.com/api/v1/book/#id";
HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG).setMethod("PATCH").setUrl(dynamicUrl).build();
MockFailureCollector collector = new MockFailureCollector("httpsinkwithvalidinputschema");
config.validateSchema(schema, collector);
Assert.assertTrue(collector.getValidationFailures().isEmpty());
}

@Test(expected = ValidationException.class)
public void testHTTPSinkWithNegativeBatchSize() {
HTTPSinkConfig config = HTTPSinkConfig.newBuilder(VALID_CONFIG)
Expand Down
3 changes: 2 additions & 1 deletion widgets/HTTP-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"POST",
"GET",
"PUT",
"DELETE"
"DELETE",
"PATCH"
],
"default": "POST"
}
Expand Down
3 changes: 2 additions & 1 deletion widgets/HTTP-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
"POST",
"PUT",
"DELETE",
"HEAD"
"HEAD",
"PATCH"
],
"default": "GET"
}
Expand Down
3 changes: 2 additions & 1 deletion widgets/HTTP-streamingsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
"POST",
"PUT",
"DELETE",
"HEAD"
"HEAD",
"PATCH"
],
"default": "GET"
}
Expand Down

0 comments on commit 341cb03

Please sign in to comment.