Skip to content

Commit

Permalink
Merge pull request #1275 from ashitsalesforce/master
Browse files Browse the repository at this point in the history
support bulk v1 through proxy
  • Loading branch information
ashitsalesforce committed Sep 23, 2024
2 parents ef1d3c0 + 24da09c commit 97a4c8a
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ private int getNumRecordsFailedInJob() {
return numRecordsFailedInJob;
}
BatchInfoList getBatches() throws AsyncApiException {
BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection();
BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection();
return connectionClient.getBatchInfoList(getJobId());
}

CSVReader getBatchResults(String batchId) throws AsyncApiException {
BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection();
BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection();
return new CSVReader(connectionClient.getBatchResultStream(getJobId(), batchId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,29 @@
*/
package com.salesforce.dataloader.action.visitor.bulk;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.salesforce.dataloader.client.ClientBase;
import com.salesforce.dataloader.client.HttpTransportInterface;
import com.salesforce.dataloader.exception.HttpClientTransportException;
import com.salesforce.dataloader.util.AppUtil;
import com.sforce.async.AsyncApiException;
import com.sforce.async.AsyncExceptionCode;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BulkConnection;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.QueryResultList;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.parser.PullParserException;
import com.sforce.ws.parser.XmlInputStream;

public class BulkV1Connection extends BulkConnection {
private static Logger logger = LogManager.getLogger(BulkV1Connection.class);
Expand All @@ -49,4 +65,80 @@ public void addHeader(String headerName, String headerValue) {
logger.debug(ClientBase.SFORCE_CALL_OPTIONS_HEADER + " : " + headerValue);
}
}

public JobInfo getJobStatus(String jobId) throws AsyncApiException {
return getJobStatus(jobId, ContentType.XML);
}

public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId;
InputStream in = invokeBulkV1GET(endpoint);
return processBulkV1Get(in, contentType, JobInfo.class);
}

public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
return getBatchInfoList(jobId, ContentType.XML);
}

public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/";
InputStream in = invokeBulkV1GET(endpoint);
return processBulkV1Get(in, contentType, BatchInfoList.class);
}

public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result";
return invokeBulkV1GET(endpoint);
}

public QueryResultList getQueryResultList(String jobId, String batchId) throws AsyncApiException {
return getQueryResultList(jobId, batchId, ContentType.XML);
}

public QueryResultList getQueryResultList(String jobId, String batchId, ContentType contentType) throws AsyncApiException {
InputStream in = getBatchResultStream(jobId, batchId);
return processBulkV1Get(in, contentType, QueryResultList.class);
}

public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result" + "/" + resultId;
return invokeBulkV1GET(endpoint);
}

private String getBulkEndpoint() {
String endpoint = getConfig().getRestEndpoint();
endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/";
return endpoint;
}

private InputStream invokeBulkV1GET(String endpoint) throws AsyncApiException {
try {
endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/";
HttpTransportInterface transport = (HttpTransportInterface) getConfig().createTransport();
return transport.httpGet(endpoint);

} catch (IOException | HttpClientTransportException | ConnectionException e) {
logger.error(e.getMessage());
throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e);
}
}

private <T> T processBulkV1Get(InputStream is, ContentType contentType, Class<T> returnClass) throws AsyncApiException {
try {
if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) {
return AppUtil.deserializeJsonToObject(is, returnClass);
} else {
XmlInputStream xin = new XmlInputStream();
xin.setInput(is, "UTF-8");
@SuppressWarnings("deprecation")
T result = returnClass.newInstance();
Method loadMethod = returnClass.getMethod("load", xin.getClass(), typeMapper.getClass());
loadMethod.invoke(result, xin, typeMapper);
return result;
}
} catch (IOException | PullParserException | InstantiationException | IllegalAccessException | NoSuchMethodException | SecurityException | IllegalArgumentException | InvocationTargetException e) {
logger.error(e.getMessage());
throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ public class HttpClientTransport implements HttpTransportInterface {


private static final String AUTH_HEADER_VALUE_PREFIX = "Bearer ";
private static final String AUTH_HEADER = "Authorization";
private static final String AUTH_HEADER_FOR_JSON = "Authorization";
private static final String AUTH_HEADER_FOR_XML = "X-SFDC-Session";
private static final String USER_AGENT_HEADER = "User-Agent";

private static ConnectorConfig currentConfig = null;
private boolean successful;
private HttpRequestBase httpMethod;
private HttpRequestBase httpMethod = null;
private OutputStream output;
private ByteArrayOutputStream entityByteOut;
private static CloseableHttpClient currentHttpClient = null;
Expand Down Expand Up @@ -141,12 +143,26 @@ private boolean areEquivalentConfigs(ConnectorConfig config1, ConnectorConfig co

private boolean isHttpClientInitialized = false;

private void setAuthAndClientHeadersForHttpMethod() {
if (this.httpMethod != null
&& currentConfig.getSessionId() != null
&& !currentConfig.getSessionId().isBlank()) {
this.httpMethod.removeHeaders(AUTH_HEADER_FOR_JSON);
this.httpMethod.removeHeaders(AUTH_HEADER_FOR_XML);
this.httpMethod.removeHeaders(USER_AGENT_HEADER);
this.httpMethod.addHeader(AUTH_HEADER_FOR_JSON, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader(AUTH_HEADER_FOR_XML, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader(USER_AGENT_HEADER, VersionInfo.info());
}
}

private synchronized void initializeHttpClient() throws UnknownHostException {
if (isHttpClientInitialized) {
// already initialized.
return;
}
closeConnections();
httpMethod = null;
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create().useSystemProperties();

if (currentConfig != null
Expand Down Expand Up @@ -196,8 +212,6 @@ private synchronized void initializeHttpClient() throws UnknownHostException {
public synchronized InputStream getContent() throws IOException {
serverInvocationCount++;
initializeHttpClient();
this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader("User-Agent", VersionInfo.info());
if (this.httpMethod instanceof HttpEntityEnclosingRequestBase
&& ((HttpEntityEnclosingRequestBase)this.httpMethod).getEntity() == null) {
byte[] entityBytes = entityByteOut.toByteArray();
Expand Down Expand Up @@ -295,6 +309,7 @@ private OutputStream doConnect(String endpoint,
SupportedHttpMethodType httpMethodType,
InputStream requestInputStream,
String contentTypeStr) throws IOException {
initializeHttpClient();
switch (httpMethodType) {
case GET :
this.httpMethod = new HttpGet(endpoint);
Expand All @@ -316,9 +331,7 @@ private OutputStream doConnect(String endpoint,
this.httpMethod.addHeader(name, httpHeaders.get(name));
}
}
this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader("User-Agent", VersionInfo.info());

setAuthAndClientHeadersForHttpMethod();
if (requestInputStream != null) {
ContentType contentType = ContentType.DEFAULT_TEXT;
if (contentTypeStr != null) {
Expand Down

0 comments on commit 97a4c8a

Please sign in to comment.