Skip to content

Commit

Permalink
Merge pull request #286 from rpmoore/null_node_id
Browse files Browse the repository at this point in the history
Minor refactoring for JobPartTracker.  Added a workaround for a NPE t…
  • Loading branch information
Denver authored Jul 15, 2016
2 parents 31b685b + b250474 commit 818abdc
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 52 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

allprojects {
group = 'com.spectralogic.ds3'
version = '3.0.3'
version = '3.0.4'
}

subprojects {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package com.spectralogic.ds3client.helpers;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -61,15 +62,16 @@ public void transferChunks(
final Iterable<Objects> chunks)
throws SignatureException, IOException, XmlProcessingException {
LOG.debug("Getting ready to process chunks");
final Map<UUID, JobNode> nodeMap = buildNodeMap(nodes);
final ImmutableMap<UUID, JobNode> nodeMap = buildNodeMap(nodes);
LOG.debug("Starting executor service");
final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxParallelRequests));
LOG.debug("Executor service started");
try {
final List<ListenableFuture<?>> tasks = new ArrayList<>();
for (final Objects chunk : chunks) {
LOG.debug("Processing parts for chunk: " + chunk.getChunkId().toString());
final Ds3Client client = mainClient.newForNode(nodeMap.get(chunk.getNodeId()));

final Ds3Client client = getClient(nodeMap, chunk.getNodeId(), mainClient);
for (final BulkObject ds3Object : chunk.getObjects()) {
final ObjectPart part = new ObjectPart(ds3Object.getOffset(), ds3Object.getLength());
if (this.partTracker.containsPart(ds3Object.getName(), part)) {
Expand All @@ -93,12 +95,23 @@ public Object call() throws Exception {
}
}

private static Map<UUID, JobNode> buildNodeMap(final Iterable<JobNode> nodes) {
final Map<UUID, JobNode> nodeMap = new HashMap<>();
for(final JobNode node: nodes) {
private static Ds3Client getClient(final ImmutableMap<UUID, JobNode> nodeMap, final UUID nodeId, final Ds3Client mainClient) {
final JobNode jobNode = nodeMap.get(nodeId);

if (jobNode == null) {
LOG.warn("The jobNode was not found, returning the existing client");
return mainClient;
}

return mainClient.newForNode(jobNode);
}

private static ImmutableMap<UUID, JobNode> buildNodeMap(final Iterable<JobNode> nodes) {
final ImmutableMap.Builder<UUID, JobNode> nodeMap = ImmutableMap.builder();
for (final JobNode node: nodes) {
nodeMap.put(node.getId(), node);
}
return nodeMap;
return nodeMap.build();
}

private static void executeWithExceptionHandling(final List<ListenableFuture<?>> tasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,16 @@

package com.spectralogic.ds3client.helpers;

import java.util.Map;

/**
* This class manages parts for all of the objects in the job. It aggregates
* ObjectPartTracker implementations, which manage the parts for a single
* object.
*/
public class JobPartTracker {
private final Map<String, ObjectPartTracker> trackers;

public JobPartTracker(final Map<String, ObjectPartTracker> trackers) {
this.trackers = trackers;
}

public void completePart(final String key, final ObjectPart objectPart) {
trackers.get(key).completePart(objectPart);
}

public boolean containsPart(final String key, final ObjectPart objectPart) {
return trackers.get(key).containsPart(objectPart);
}

public JobPartTracker attachDataTransferredListener(final DataTransferredListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.attachDataTransferredListener(listener);
}
return this;
}

public JobPartTracker attachObjectCompletedListener(final ObjectCompletedListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.attachObjectCompletedListener(listener);
}
return this;
}

public void removeDataTransferredListener(final DataTransferredListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.removeDataTransferredListener(listener);
}
}

public void removeObjectCompletedListener(final ObjectCompletedListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.removeObjectCompletedListener(listener);
}
}
public interface JobPartTracker {
void completePart(final String key, final ObjectPart objectPart);
boolean containsPart(final String key, final ObjectPart objectPart);
JobPartTracker attachDataTransferredListener(final DataTransferredListener listener);
JobPartTracker attachObjectCompletedListener(final ObjectCompletedListener listener);
void removeDataTransferredListener(final DataTransferredListener listener);
void removeObjectCompletedListener(final ObjectCompletedListener listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static JobPartTracker buildPartTracker(final Iterable<BulkObject> objects
for (final BulkObject bulkObject : Preconditions.checkNotNull(objects)) {
multimap.put(bulkObject.getName(), new ObjectPart(bulkObject.getOffset(), bulkObject.getLength()));
}
return new JobPartTracker(new HashMap<>(Maps.transformEntries(
return new JobPartTrackerImpl(new HashMap<>(Maps.transformEntries(
multimap.asMap(),
new BuildObjectPartTrackerFromObjectPartGroup()
)));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* ******************************************************************************
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* ****************************************************************************
*/

package com.spectralogic.ds3client.helpers;

import java.util.Map;

public class JobPartTrackerImpl implements JobPartTracker {

private final Map<String, ObjectPartTracker> trackers;

public JobPartTrackerImpl(final Map<String, ObjectPartTracker> trackers) {
this.trackers = trackers;
}

public void completePart(final String key, final ObjectPart objectPart) {
trackers.get(key).completePart(objectPart);
}

public boolean containsPart(final String key, final ObjectPart objectPart) {
return trackers.get(key).containsPart(objectPart);
}

public JobPartTracker attachDataTransferredListener(final DataTransferredListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.attachDataTransferredListener(listener);
}
return this;
}

public JobPartTracker attachObjectCompletedListener(final ObjectCompletedListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.attachObjectCompletedListener(listener);
}
return this;
}

public void removeDataTransferredListener(final DataTransferredListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.removeDataTransferredListener(listener);
}
}

public void removeObjectCompletedListener(final ObjectCompletedListener listener) {
for (final ObjectPartTracker tracker : this.trackers.values()) {
tracker.removeObjectCompletedListener(listener);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* ******************************************************************************
* Copyright 2014-2016 Spectra Logic Corporation. All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License"). You may not use
* this file except in compliance with the License. A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* ****************************************************************************
*/
package com.spectralogic.ds3client.helpers;

import com.google.common.collect.ImmutableList;
import com.spectralogic.ds3client.Ds3Client;
import com.spectralogic.ds3client.models.JobNode;
import com.spectralogic.ds3client.models.Objects;
import com.spectralogic.ds3client.serializer.XmlProcessingException;
import org.junit.Test;

import java.io.IOException;
import java.security.SignatureException;
import java.util.ArrayList;
import java.util.UUID;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class ChunkTransferrer_Test {

@Test
public void nullJobNode() throws XmlProcessingException, SignatureException, IOException {
final Ds3Client client = mock(Ds3Client.class);

final ChunkTransferrer chunkTransferrer = new ChunkTransferrer(null, client, null, 1);
final ImmutableList.Builder<Objects> objectsBuilder = ImmutableList.builder();

final Objects objects = new Objects();
objects.setChunkId(UUID.randomUUID());
objects.setChunkNumber(0);
objectsBuilder.add(objects);

chunkTransferrer.transferChunks(new ArrayList<JobNode>(), objectsBuilder.build());

verify(client, times(0)).newForNode((JobNode) any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void trackerCallsTrackers() {
final Map<String, ObjectPartTracker> trackers = new HashMap<>();
trackers.put("foo", fooTracker);
trackers.put("bar", barTracker);
final JobPartTracker jobPartTracker = new JobPartTracker(trackers);
final JobPartTracker jobPartTracker = new JobPartTrackerImpl(trackers);

final ObjectPart[] partsRemoved = {
new ObjectPart(10, 11),
Expand Down Expand Up @@ -65,7 +65,7 @@ public void TrackerEventsForward()
final List<Long> sizes = new ArrayList<>();
final List<String> objects = new ArrayList<>();

final JobPartTracker jobPartTracker = new JobPartTracker(trackers);
final JobPartTracker jobPartTracker = new JobPartTrackerImpl(trackers);
jobPartTracker.attachDataTransferredListener(new DataTransferredListener() {
@Override
public void dataTransferred(final long size) {
Expand Down

0 comments on commit 818abdc

Please sign in to comment.