Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2582: Add a metric to track container failure tracking metric for Samza #1417

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>, \<topic\>, are popula
| | expired-preferred-host-requests | Number of expired resource-requests-for -preferred-host received by the cluster manager. |
| | expired-any-host-requests | Number of expired resource-requests-for -any-host received by the cluster manager. |
| | host-affinity-match-pct | Percentage of non-expired preferred host requests. This measures the % of resource-requests for which host-affinity provided the preferred host. |
| | \<containerId\>-failure-count | Number of times a container identified by containerId has failed |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we decided to use "processorId" for 0,1,2..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that lingo is used internally in code as the naming conventions for javadocs, this is public-facing metrics page where we do not need to have context between processorId and containerId


| **Group** | **Metric name** | **Meaning** |
| --- | --- | --- |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri
this.jobConfig = new JobConfig(clusterManagerConfig);

this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled();

this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(jobConfig, state, registry);
this.clusterResourceManager = resourceManager;
this.containerManager = containerManager;
this.diagnosticsManager = Option.empty();
Expand Down Expand Up @@ -254,6 +254,11 @@ public void start() {
});
containerAllocator.requestResources(processorToHost);

// Initialize the per processor failure count to be 0
processorToHostMapping.keySet().forEach(processorId -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment below on how/why this information isnt really in "state"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replied there

containerProcessManagerMetrics.registerProcessorFailureCountMetric(processorId);
});

// Start container allocator thread
LOG.info("Starting the container allocator thread");
allocatorThread.start();
Expand Down Expand Up @@ -490,6 +495,7 @@ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, St
LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus);
Instant now = Instant.now();
state.failedContainers.incrementAndGet();
containerProcessManagerMetrics.incrementProcessorFailureCountMetric(processorId);
state.jobHealthy.set(false);

state.neededProcessors.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,19 @@ class ContainerProcessManagerMetrics(val config: Config,

val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb)
val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores)


/**
* Maitains the map of processorId 0,1,2 to failure count for one container
*/
val mPerContainerFailureCount = collection.mutable.Map[String, Gauge[Int]]()

def registerProcessorFailureCountMetric(processorId: String) {
mPerContainerFailureCount.put(processorId, newGauge("container_" + processorId + "-failure-count", 0))
}

def incrementProcessorFailureCountMetric(processorId: String) {
mPerContainerFailureCount.get(processorId).get.set(mPerContainerFailureCount.get(processorId).get.getValue + 1)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class TestContainerProcessManager {
};
private Config config = new MapConfig(configVals);
private ContainerPlacementMetadataStore containerPlacementMetadataStore;
private CoordinatorStreamStore coordinatorStreamStore;
private ContainerProcessManager cpm;

private Config getConfig() {
Map<String, String> map = new HashMap<>();
Expand Down Expand Up @@ -117,7 +119,7 @@ private JobModelManager getJobModelManager(int containerCount) {
public void setup() throws Exception {
server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class));
CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
coordinatorStreamStore.init();
containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
containerPlacementMetadataStore.start();
Expand Down Expand Up @@ -145,8 +147,7 @@ public void testContainerProcessManager() throws Exception {
.thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1"))));
ContainerManager containerManager =
buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
ContainerProcessManager cpm =
buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());
cpm = buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty());

ContainerAllocator allocator =
(ContainerAllocator) getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm);
Expand Down Expand Up @@ -192,8 +193,7 @@ public void testOnInit() throws Exception {
buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager,
clusterManagerConfig.getHostAffinityEnabled(), false);

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());

MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity(
clusterResourceManager,
Expand Down Expand Up @@ -223,7 +223,7 @@ public void run() {
assertEquals(1, state.neededProcessors.get());
assertEquals(1, allocator.requestedContainers);

cpm.stop();

}

@Test
Expand All @@ -234,8 +234,7 @@ public void testOnShutdown() throws Exception {
MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf));

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty());
cpm.start();

Thread allocatorThread = (Thread) getPrivateFieldFromCpm("allocatorThread", cpm).get(cpm);
Expand Down Expand Up @@ -266,8 +265,7 @@ public void testCpmShouldStopWhenContainersFinish() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -314,8 +312,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception
state,
containerManager);

ContainerProcessManager cpm = spy(
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -373,7 +370,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception
assertTrue(cpm.shouldShutdown());
assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status);

cpm.stop();

}

/**
Expand Down Expand Up @@ -413,8 +410,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod
state,
containerManager);

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator));

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -467,7 +463,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod
assertEquals(false, cpm.getJobFailureCriteriaMet());
assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount());

cpm.stop();

}

@Test
Expand Down Expand Up @@ -509,8 +505,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown
state,
containerManager);

ContainerProcessManager cpm =
buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager);
cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager);

// start triggers a request
cpm.start();
Expand Down Expand Up @@ -598,7 +593,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown
assertEquals(0, allocator.getContainerRequestState().numPendingRequests());
assertEquals(0, allocator.getContainerRequestState().numDelayedRequests());

cpm.stop();

}

@Test
Expand All @@ -619,8 +614,7 @@ public void testInvalidNotificationsAreIgnored() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// Start the task clusterResourceManager
cpm.start();
Expand Down Expand Up @@ -700,8 +694,7 @@ public void testAllBufferedResourcesAreUtilized() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager));
cpm = spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager));

cpm.start();
assertFalse(cpm.shouldShutdown());
Expand Down Expand Up @@ -765,8 +758,7 @@ public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// Start the task manager
cpm.start();
Expand Down Expand Up @@ -840,8 +832,7 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception {
state,
containerManager);

ContainerProcessManager cpm =
spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));
cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)));

// Start the task clusterResourceManager
cpm.start();
Expand Down Expand Up @@ -913,13 +904,16 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception {
assertFalse(cpm.shouldShutdown());
assertFalse(state.jobHealthy.get());
assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost());

cpm.stop();
}

@After
public void teardown() {
if (cpm != null) {
cpm.stop();
}
server.stop();
containerPlacementMetadataStore.stop();
coordinatorStreamStore.close();
}

private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,
Expand Down