Skip to content

Commit

Permalink
support bulk v1 through proxy
Browse files Browse the repository at this point in the history
support bulk v1 through proxy by using the same approach for HTTP GET as for other HTTP methods (POST, PUT, DELETE).
  • Loading branch information
ashitsalesforce committed Sep 23, 2024
1 parent 2fd1dab commit 24da09c
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ private int getNumRecordsFailedInJob() {
return numRecordsFailedInJob;
}
BatchInfoList getBatches() throws AsyncApiException {
BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection();
BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection();
return connectionClient.getBatchInfoList(getJobId());
}

CSVReader getBatchResults(String batchId) throws AsyncApiException {
BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection();
BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection();
return new CSVReader(connectionClient.getBatchResultStream(getJobId(), batchId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,29 @@
*/
package com.salesforce.dataloader.action.visitor.bulk;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.salesforce.dataloader.client.ClientBase;
import com.salesforce.dataloader.client.HttpTransportInterface;
import com.salesforce.dataloader.exception.HttpClientTransportException;
import com.salesforce.dataloader.util.AppUtil;
import com.sforce.async.AsyncApiException;
import com.sforce.async.AsyncExceptionCode;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BulkConnection;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.QueryResultList;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.parser.PullParserException;
import com.sforce.ws.parser.XmlInputStream;

public class BulkV1Connection extends BulkConnection {
private static Logger logger = LogManager.getLogger(BulkV1Connection.class);
Expand All @@ -49,4 +65,80 @@ public void addHeader(String headerName, String headerValue) {
logger.debug(ClientBase.SFORCE_CALL_OPTIONS_HEADER + " : " + headerValue);
}
}

public JobInfo getJobStatus(String jobId) throws AsyncApiException {
return getJobStatus(jobId, ContentType.XML);
}

public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId;
InputStream in = invokeBulkV1GET(endpoint);
return processBulkV1Get(in, contentType, JobInfo.class);
}

public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
return getBatchInfoList(jobId, ContentType.XML);
}

public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/";
InputStream in = invokeBulkV1GET(endpoint);
return processBulkV1Get(in, contentType, BatchInfoList.class);
}

public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result";
return invokeBulkV1GET(endpoint);
}

public QueryResultList getQueryResultList(String jobId, String batchId) throws AsyncApiException {
return getQueryResultList(jobId, batchId, ContentType.XML);
}

public QueryResultList getQueryResultList(String jobId, String batchId, ContentType contentType) throws AsyncApiException {
InputStream in = getBatchResultStream(jobId, batchId);
return processBulkV1Get(in, contentType, QueryResultList.class);
}

public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result" + "/" + resultId;
return invokeBulkV1GET(endpoint);
}

private String getBulkEndpoint() {
String endpoint = getConfig().getRestEndpoint();
endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/";
return endpoint;
}

private InputStream invokeBulkV1GET(String endpoint) throws AsyncApiException {
try {
endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/";
HttpTransportInterface transport = (HttpTransportInterface) getConfig().createTransport();
return transport.httpGet(endpoint);

} catch (IOException | HttpClientTransportException | ConnectionException e) {
logger.error(e.getMessage());
throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e);
}
}

private <T> T processBulkV1Get(InputStream is, ContentType contentType, Class<T> returnClass) throws AsyncApiException {
try {
if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) {
return AppUtil.deserializeJsonToObject(is, returnClass);
} else {
XmlInputStream xin = new XmlInputStream();
xin.setInput(is, "UTF-8");
@SuppressWarnings("deprecation")
T result = returnClass.newInstance();
Method loadMethod = returnClass.getMethod("load", xin.getClass(), typeMapper.getClass());
loadMethod.invoke(result, xin, typeMapper);
return result;
}
} catch (IOException | PullParserException | InstantiationException | IllegalAccessException | NoSuchMethodException | SecurityException | IllegalArgumentException | InvocationTargetException e) {
logger.error(e.getMessage());
throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ public class HttpClientTransport implements HttpTransportInterface {


private static final String AUTH_HEADER_VALUE_PREFIX = "Bearer ";
private static final String AUTH_HEADER = "Authorization";
private static final String AUTH_HEADER_FOR_JSON = "Authorization";
private static final String AUTH_HEADER_FOR_XML = "X-SFDC-Session";
private static final String USER_AGENT_HEADER = "User-Agent";

private static ConnectorConfig currentConfig = null;
private boolean successful;
private HttpRequestBase httpMethod;
private HttpRequestBase httpMethod = null;
private OutputStream output;
private ByteArrayOutputStream entityByteOut;
private static CloseableHttpClient currentHttpClient = null;
Expand Down Expand Up @@ -141,12 +143,26 @@ private boolean areEquivalentConfigs(ConnectorConfig config1, ConnectorConfig co

private boolean isHttpClientInitialized = false;

private void setAuthAndClientHeadersForHttpMethod() {
if (this.httpMethod != null
&& currentConfig.getSessionId() != null
&& !currentConfig.getSessionId().isBlank()) {
this.httpMethod.removeHeaders(AUTH_HEADER_FOR_JSON);
this.httpMethod.removeHeaders(AUTH_HEADER_FOR_XML);
this.httpMethod.removeHeaders(USER_AGENT_HEADER);
this.httpMethod.addHeader(AUTH_HEADER_FOR_JSON, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader(AUTH_HEADER_FOR_XML, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader(USER_AGENT_HEADER, VersionInfo.info());
}
}

private synchronized void initializeHttpClient() throws UnknownHostException {
if (isHttpClientInitialized) {
// already initialized.
return;
}
closeConnections();
httpMethod = null;
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create().useSystemProperties();

if (currentConfig != null
Expand Down Expand Up @@ -196,8 +212,6 @@ private synchronized void initializeHttpClient() throws UnknownHostException {
public synchronized InputStream getContent() throws IOException {
serverInvocationCount++;
initializeHttpClient();
this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader("User-Agent", VersionInfo.info());
if (this.httpMethod instanceof HttpEntityEnclosingRequestBase
&& ((HttpEntityEnclosingRequestBase)this.httpMethod).getEntity() == null) {
byte[] entityBytes = entityByteOut.toByteArray();
Expand Down Expand Up @@ -295,6 +309,7 @@ private OutputStream doConnect(String endpoint,
SupportedHttpMethodType httpMethodType,
InputStream requestInputStream,
String contentTypeStr) throws IOException {
initializeHttpClient();
switch (httpMethodType) {
case GET :
this.httpMethod = new HttpGet(endpoint);
Expand All @@ -316,9 +331,7 @@ private OutputStream doConnect(String endpoint,
this.httpMethod.addHeader(name, httpHeaders.get(name));
}
}
this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader("User-Agent", VersionInfo.info());

setAuthAndClientHeadersForHttpMethod();
if (requestInputStream != null) {
ContentType contentType = ContentType.DEFAULT_TEXT;
if (contentTypeStr != null) {
Expand Down

0 comments on commit 24da09c

Please sign in to comment.