Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support bulk v1 through proxy #1275

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,12 +384,12 @@ private int getNumRecordsFailedInJob() {
return numRecordsFailedInJob;
}
BatchInfoList getBatches() throws AsyncApiException {
BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection();
BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection();
return connectionClient.getBatchInfoList(getJobId());
}

CSVReader getBatchResults(String batchId) throws AsyncApiException {
BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection();
BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection();
return new CSVReader(connectionClient.getBatchResultStream(getJobId(), batchId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,29 @@
*/
package com.salesforce.dataloader.action.visitor.bulk;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.salesforce.dataloader.client.ClientBase;
import com.salesforce.dataloader.client.HttpTransportInterface;
import com.salesforce.dataloader.exception.HttpClientTransportException;
import com.salesforce.dataloader.util.AppUtil;
import com.sforce.async.AsyncApiException;
import com.sforce.async.AsyncExceptionCode;
import com.sforce.async.BatchInfoList;
import com.sforce.async.BulkConnection;
import com.sforce.async.ContentType;
import com.sforce.async.JobInfo;
import com.sforce.async.QueryResultList;
import com.sforce.ws.ConnectionException;
import com.sforce.ws.ConnectorConfig;
import com.sforce.ws.parser.PullParserException;
import com.sforce.ws.parser.XmlInputStream;

public class BulkV1Connection extends BulkConnection {
private static Logger logger = LogManager.getLogger(BulkV1Connection.class);
Expand All @@ -49,4 +65,80 @@ public void addHeader(String headerName, String headerValue) {
logger.debug(ClientBase.SFORCE_CALL_OPTIONS_HEADER + " : " + headerValue);
}
}

public JobInfo getJobStatus(String jobId) throws AsyncApiException {
return getJobStatus(jobId, ContentType.XML);
}

public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId;
InputStream in = invokeBulkV1GET(endpoint);
return processBulkV1Get(in, contentType, JobInfo.class);
}

public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException {
return getBatchInfoList(jobId, ContentType.XML);
}

public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/";
InputStream in = invokeBulkV1GET(endpoint);
return processBulkV1Get(in, contentType, BatchInfoList.class);
}

public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result";
return invokeBulkV1GET(endpoint);
}

public QueryResultList getQueryResultList(String jobId, String batchId) throws AsyncApiException {
return getQueryResultList(jobId, batchId, ContentType.XML);
}

public QueryResultList getQueryResultList(String jobId, String batchId, ContentType contentType) throws AsyncApiException {
InputStream in = getBatchResultStream(jobId, batchId);
return processBulkV1Get(in, contentType, QueryResultList.class);
}

public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException {
String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result" + "/" + resultId;
return invokeBulkV1GET(endpoint);
}

private String getBulkEndpoint() {
String endpoint = getConfig().getRestEndpoint();
endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/";
return endpoint;
}

private InputStream invokeBulkV1GET(String endpoint) throws AsyncApiException {
try {
endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/";
HttpTransportInterface transport = (HttpTransportInterface) getConfig().createTransport();
return transport.httpGet(endpoint);

} catch (IOException | HttpClientTransportException | ConnectionException e) {
logger.error(e.getMessage());
throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e);
}
}

private <T> T processBulkV1Get(InputStream is, ContentType contentType, Class<T> returnClass) throws AsyncApiException {
try {
if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) {
return AppUtil.deserializeJsonToObject(is, returnClass);
} else {
XmlInputStream xin = new XmlInputStream();
xin.setInput(is, "UTF-8");
@SuppressWarnings("deprecation")
T result = returnClass.newInstance();
Method loadMethod = returnClass.getMethod("load", xin.getClass(), typeMapper.getClass());
loadMethod.invoke(result, xin, typeMapper);
return result;
}
} catch (IOException | PullParserException | InstantiationException | IllegalAccessException | NoSuchMethodException | SecurityException | IllegalArgumentException | InvocationTargetException e) {
logger.error(e.getMessage());
throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ public class HttpClientTransport implements HttpTransportInterface {


private static final String AUTH_HEADER_VALUE_PREFIX = "Bearer ";
private static final String AUTH_HEADER = "Authorization";
private static final String AUTH_HEADER_FOR_JSON = "Authorization";
private static final String AUTH_HEADER_FOR_XML = "X-SFDC-Session";
private static final String USER_AGENT_HEADER = "User-Agent";

private static ConnectorConfig currentConfig = null;
private boolean successful;
private HttpRequestBase httpMethod;
private HttpRequestBase httpMethod = null;
private OutputStream output;
private ByteArrayOutputStream entityByteOut;
private static CloseableHttpClient currentHttpClient = null;
Expand Down Expand Up @@ -141,12 +143,26 @@ private boolean areEquivalentConfigs(ConnectorConfig config1, ConnectorConfig co

private boolean isHttpClientInitialized = false;

private void setAuthAndClientHeadersForHttpMethod() {
if (this.httpMethod != null
&& currentConfig.getSessionId() != null
&& !currentConfig.getSessionId().isBlank()) {
this.httpMethod.removeHeaders(AUTH_HEADER_FOR_JSON);
this.httpMethod.removeHeaders(AUTH_HEADER_FOR_XML);
this.httpMethod.removeHeaders(USER_AGENT_HEADER);
this.httpMethod.addHeader(AUTH_HEADER_FOR_JSON, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader(AUTH_HEADER_FOR_XML, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader(USER_AGENT_HEADER, VersionInfo.info());
}
}

private synchronized void initializeHttpClient() throws UnknownHostException {
if (isHttpClientInitialized) {
// already initialized.
return;
}
closeConnections();
httpMethod = null;
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create().useSystemProperties();

if (currentConfig != null
Expand Down Expand Up @@ -196,8 +212,6 @@ private synchronized void initializeHttpClient() throws UnknownHostException {
public synchronized InputStream getContent() throws IOException {
serverInvocationCount++;
initializeHttpClient();
this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader("User-Agent", VersionInfo.info());
if (this.httpMethod instanceof HttpEntityEnclosingRequestBase
&& ((HttpEntityEnclosingRequestBase)this.httpMethod).getEntity() == null) {
byte[] entityBytes = entityByteOut.toByteArray();
Expand Down Expand Up @@ -295,6 +309,7 @@ private OutputStream doConnect(String endpoint,
SupportedHttpMethodType httpMethodType,
InputStream requestInputStream,
String contentTypeStr) throws IOException {
initializeHttpClient();
switch (httpMethodType) {
case GET :
this.httpMethod = new HttpGet(endpoint);
Expand All @@ -316,9 +331,7 @@ private OutputStream doConnect(String endpoint,
this.httpMethod.addHeader(name, httpHeaders.get(name));
}
}
this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId());
this.httpMethod.addHeader("User-Agent", VersionInfo.info());

setAuthAndClientHeadersForHttpMethod();
if (requestInputStream != null) {
ContentType contentType = ContentType.DEFAULT_TEXT;
if (contentTypeStr != null) {
Expand Down
Loading