diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java index d09a649f..12479d8f 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkApiVisitorUtil.java @@ -384,12 +384,12 @@ private int getNumRecordsFailedInJob() { return numRecordsFailedInJob; } BatchInfoList getBatches() throws AsyncApiException { - BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection(); + BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection(); return connectionClient.getBatchInfoList(getJobId()); } CSVReader getBatchResults(String batchId) throws AsyncApiException { - BulkConnection connectionClient = this.controller.getBulkV1Client().getConnection(); + BulkV1Connection connectionClient = this.controller.getBulkV1Client().getConnection(); return new CSVReader(connectionClient.getBatchResultStream(getJobId(), batchId)); } diff --git a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1Connection.java b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1Connection.java index bc7bf0b3..df886cc9 100644 --- a/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1Connection.java +++ b/src/main/java/com/salesforce/dataloader/action/visitor/bulk/BulkV1Connection.java @@ -25,13 +25,29 @@ */ package com.salesforce.dataloader.action.visitor.bulk; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.salesforce.dataloader.client.ClientBase; +import com.salesforce.dataloader.client.HttpTransportInterface; +import com.salesforce.dataloader.exception.HttpClientTransportException; +import com.salesforce.dataloader.util.AppUtil; import com.sforce.async.AsyncApiException; +import com.sforce.async.AsyncExceptionCode; +import com.sforce.async.BatchInfoList; import com.sforce.async.BulkConnection; +import com.sforce.async.ContentType; +import com.sforce.async.JobInfo; +import com.sforce.async.QueryResultList; +import com.sforce.ws.ConnectionException; import com.sforce.ws.ConnectorConfig; +import com.sforce.ws.parser.PullParserException; +import com.sforce.ws.parser.XmlInputStream; public class BulkV1Connection extends BulkConnection { private static Logger logger = LogManager.getLogger(BulkV1Connection.class); @@ -49,4 +65,80 @@ public void addHeader(String headerName, String headerValue) { logger.debug(ClientBase.SFORCE_CALL_OPTIONS_HEADER + " : " + headerValue); } } + + public JobInfo getJobStatus(String jobId) throws AsyncApiException { + return getJobStatus(jobId, ContentType.XML); + } + + public JobInfo getJobStatus(String jobId, ContentType contentType) throws AsyncApiException { + String endpoint = getBulkEndpoint() + "job/" + jobId; + InputStream in = invokeBulkV1GET(endpoint); + return processBulkV1Get(in, contentType, JobInfo.class); + } + + public BatchInfoList getBatchInfoList(String jobId) throws AsyncApiException { + return getBatchInfoList(jobId, ContentType.XML); + } + + public BatchInfoList getBatchInfoList(String jobId, ContentType contentType) throws AsyncApiException { + String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/"; + InputStream in = invokeBulkV1GET(endpoint); + return processBulkV1Get(in, contentType, BatchInfoList.class); + } + + public InputStream getBatchResultStream(String jobId, String batchId) throws AsyncApiException { + String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result"; + return invokeBulkV1GET(endpoint); + } + + public QueryResultList getQueryResultList(String jobId, String batchId) throws AsyncApiException { + return getQueryResultList(jobId, batchId, ContentType.XML); + } + + public QueryResultList getQueryResultList(String jobId, String batchId, ContentType contentType) throws AsyncApiException { + InputStream in = getBatchResultStream(jobId, batchId); + return processBulkV1Get(in, contentType, QueryResultList.class); + } + + public InputStream getQueryResultStream(String jobId, String batchId, String resultId) throws AsyncApiException { + String endpoint = getBulkEndpoint() + "job/" + jobId + "/batch/" + batchId + "/result" + "/" + resultId; + return invokeBulkV1GET(endpoint); + } + + private String getBulkEndpoint() { + String endpoint = getConfig().getRestEndpoint(); + endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/"; + return endpoint; + } + + private InputStream invokeBulkV1GET(String endpoint) throws AsyncApiException { + try { + endpoint = endpoint.endsWith("/") ? endpoint : endpoint + "/"; + HttpTransportInterface transport = (HttpTransportInterface) getConfig().createTransport(); + return transport.httpGet(endpoint); + + } catch (IOException | HttpClientTransportException | ConnectionException e) { + logger.error(e.getMessage()); + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } + } + + private T processBulkV1Get(InputStream is, ContentType contentType, Class returnClass) throws AsyncApiException { + try { + if (contentType == ContentType.JSON || contentType == ContentType.ZIP_JSON) { + return AppUtil.deserializeJsonToObject(is, returnClass); + } else { + XmlInputStream xin = new XmlInputStream(); + xin.setInput(is, "UTF-8"); + @SuppressWarnings("deprecation") + T result = returnClass.newInstance(); + Method loadMethod = returnClass.getMethod("load", xin.getClass(), typeMapper.getClass()); + loadMethod.invoke(result, xin, typeMapper); + return result; + } + } catch (IOException | PullParserException | InstantiationException | IllegalAccessException | NoSuchMethodException | SecurityException | IllegalArgumentException | InvocationTargetException e) { + logger.error(e.getMessage()); + throw new AsyncApiException("Failed to get result ", AsyncExceptionCode.ClientInputError, e); + } + } } \ No newline at end of file diff --git a/src/main/java/com/salesforce/dataloader/client/HttpClientTransport.java b/src/main/java/com/salesforce/dataloader/client/HttpClientTransport.java index bb05843d..2e84e0e5 100644 --- a/src/main/java/com/salesforce/dataloader/client/HttpClientTransport.java +++ b/src/main/java/com/salesforce/dataloader/client/HttpClientTransport.java @@ -79,11 +79,13 @@ public class HttpClientTransport implements HttpTransportInterface { private static final String AUTH_HEADER_VALUE_PREFIX = "Bearer "; - private static final String AUTH_HEADER = "Authorization"; + private static final String AUTH_HEADER_FOR_JSON = "Authorization"; + private static final String AUTH_HEADER_FOR_XML = "X-SFDC-Session"; + private static final String USER_AGENT_HEADER = "User-Agent"; private static ConnectorConfig currentConfig = null; private boolean successful; - private HttpRequestBase httpMethod; + private HttpRequestBase httpMethod = null; private OutputStream output; private ByteArrayOutputStream entityByteOut; private static CloseableHttpClient currentHttpClient = null; @@ -141,12 +143,26 @@ private boolean areEquivalentConfigs(ConnectorConfig config1, ConnectorConfig co private boolean isHttpClientInitialized = false; + private void setAuthAndClientHeadersForHttpMethod() { + if (this.httpMethod != null + && currentConfig.getSessionId() != null + && !currentConfig.getSessionId().isBlank()) { + this.httpMethod.removeHeaders(AUTH_HEADER_FOR_JSON); + this.httpMethod.removeHeaders(AUTH_HEADER_FOR_XML); + this.httpMethod.removeHeaders(USER_AGENT_HEADER); + this.httpMethod.addHeader(AUTH_HEADER_FOR_JSON, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId()); + this.httpMethod.addHeader(AUTH_HEADER_FOR_XML, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId()); + this.httpMethod.addHeader(USER_AGENT_HEADER, VersionInfo.info()); + } + } + private synchronized void initializeHttpClient() throws UnknownHostException { if (isHttpClientInitialized) { // already initialized. return; } closeConnections(); + httpMethod = null; HttpClientBuilder httpClientBuilder = HttpClientBuilder.create().useSystemProperties(); if (currentConfig != null @@ -196,8 +212,6 @@ private synchronized void initializeHttpClient() throws UnknownHostException { public synchronized InputStream getContent() throws IOException { serverInvocationCount++; initializeHttpClient(); - this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId()); - this.httpMethod.addHeader("User-Agent", VersionInfo.info()); if (this.httpMethod instanceof HttpEntityEnclosingRequestBase && ((HttpEntityEnclosingRequestBase)this.httpMethod).getEntity() == null) { byte[] entityBytes = entityByteOut.toByteArray(); @@ -295,6 +309,7 @@ private OutputStream doConnect(String endpoint, SupportedHttpMethodType httpMethodType, InputStream requestInputStream, String contentTypeStr) throws IOException { + initializeHttpClient(); switch (httpMethodType) { case GET : this.httpMethod = new HttpGet(endpoint); @@ -316,9 +331,7 @@ private OutputStream doConnect(String endpoint, this.httpMethod.addHeader(name, httpHeaders.get(name)); } } - this.httpMethod.addHeader(AUTH_HEADER, AUTH_HEADER_VALUE_PREFIX + currentConfig.getSessionId()); - this.httpMethod.addHeader("User-Agent", VersionInfo.info()); - + setAuthAndClientHeadersForHttpMethod(); if (requestInputStream != null) { ContentType contentType = ContentType.DEFAULT_TEXT; if (contentTypeStr != null) {