diff --git a/docs/learn/documentation/versioned/operations/monitoring.md b/docs/learn/documentation/versioned/operations/monitoring.md index c11fbe0786..0b45b68f30 100644 --- a/docs/learn/documentation/versioned/operations/monitoring.md +++ b/docs/learn/documentation/versioned/operations/monitoring.md @@ -369,6 +369,7 @@ All \, \, \, \, \, are popula | | expired-preferred-host-requests | Number of expired resource-requests-for -preferred-host received by the cluster manager. | | | expired-any-host-requests | Number of expired resource-requests-for -any-host received by the cluster manager. | | | host-affinity-match-pct | Percentage of non-expired preferred host requests. This measures the % of resource-requests for which host-affinity provided the preferred host. | +| | \-failure-count | Number of times a container identified by containerId has failed | | **Group** | **Metric name** | **Meaning** | | --- | --- | --- | diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java index 1bc1669c60..8507e4e794 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java @@ -191,7 +191,7 @@ public ContainerProcessManager(Config config, SamzaApplicationState state, Metri this.jobConfig = new JobConfig(clusterManagerConfig); this.hostAffinityEnabled = clusterManagerConfig.getHostAffinityEnabled(); - + this.containerProcessManagerMetrics = new ContainerProcessManagerMetrics(jobConfig, state, registry); this.clusterResourceManager = resourceManager; this.containerManager = containerManager; this.diagnosticsManager = Option.empty(); @@ -254,6 +254,11 @@ public void start() { }); containerAllocator.requestResources(processorToHost); + // Initialize the per processor failure count to be 0 + processorToHostMapping.keySet().forEach(processorId -> { + containerProcessManagerMetrics.registerProcessorFailureCountMetric(processorId); + }); + // Start container allocator thread LOG.info("Starting the container allocator thread"); allocatorThread.start(); @@ -490,6 +495,7 @@ void onResourceCompletedWithUnknownStatus(SamzaResourceStatus resourceStatus, St LOG.info("Container ID: {} for Processor ID: {} failed with exit code: {}.", containerId, processorId, exitStatus); Instant now = Instant.now(); state.failedContainers.incrementAndGet(); + containerProcessManagerMetrics.incrementProcessorFailureCountMetric(processorId); state.jobHealthy.set(false); state.neededProcessors.incrementAndGet(); diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala index 91fec2886c..ea3b924b44 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -64,4 +64,19 @@ class ContainerProcessManagerMetrics(val config: Config, val mContainerMemoryMb = newGauge("container-memory-mb", () => clusterManagerConfig.getContainerMemoryMb) val mContainerCpuCores = newGauge("container-cpu-cores", () => clusterManagerConfig.getNumCores) + + + /** + * Maitains the map of processorId 0,1,2 to failure count for one container + */ + val mPerContainerFailureCount = collection.mutable.Map[String, Gauge[Int]]() + + def registerProcessorFailureCountMetric(processorId: String) { + mPerContainerFailureCount.put(processorId, newGauge("container_" + processorId + "-failure-count", 0)) + } + + def incrementProcessorFailureCountMetric(processorId: String) { + mPerContainerFailureCount.get(processorId).get.set(mPerContainerFailureCount.get(processorId).get.getValue + 1) + } + } diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java index 5e550cf6ae..8789d207bb 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java @@ -87,6 +87,8 @@ public class TestContainerProcessManager { }; private Config config = new MapConfig(configVals); private ContainerPlacementMetadataStore containerPlacementMetadataStore; + private CoordinatorStreamStore coordinatorStreamStore; + private ContainerProcessManager cpm; private Config getConfig() { Map map = new HashMap<>(); @@ -117,7 +119,7 @@ private JobModelManager getJobModelManager(int containerCount) { public void setup() throws Exception { server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config); - CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore(); + coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore(); coordinatorStreamStore.init(); containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore); containerPlacementMetadataStore.start(); @@ -145,8 +147,7 @@ public void testContainerProcessManager() throws Exception { .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "host1")))); ContainerManager containerManager = buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager); - ContainerProcessManager cpm = - buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(new ClusterManagerConfig(new MapConfig(conf)), state, clusterResourceManager, Optional.empty()); ContainerAllocator allocator = (ContainerAllocator) getPrivateFieldFromCpm("containerAllocator", cpm).get(cpm); @@ -192,8 +193,7 @@ public void testOnInit() throws Exception { buildContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, clusterManagerConfig.getHostAffinityEnabled(), false); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); MockContainerAllocatorWithoutHostAffinity allocator = new MockContainerAllocatorWithoutHostAffinity( clusterResourceManager, @@ -223,7 +223,7 @@ public void run() { assertEquals(1, state.neededProcessors.get()); assertEquals(1, allocator.requestedContainers); - cpm.stop(); + } @Test @@ -234,8 +234,7 @@ public void testOnShutdown() throws Exception { MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); ClusterManagerConfig clusterManagerConfig = spy(new ClusterManagerConfig(conf)); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.empty()); cpm.start(); Thread allocatorThread = (Thread) getPrivateFieldFromCpm("allocatorThread", cpm).get(cpm); @@ -266,8 +265,7 @@ public void testCpmShouldStopWhenContainersFinish() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // start triggers a request cpm.start(); @@ -314,8 +312,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception state, containerManager); - ContainerProcessManager cpm = spy( - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // start triggers a request cpm.start(); @@ -373,7 +370,7 @@ public void testNewContainerRequestedOnFailureWithUnknownCode() throws Exception assertTrue(cpm.shouldShutdown()); assertEquals(SamzaApplicationState.SamzaAppStatus.FAILED, state.status); - cpm.stop(); + } /** @@ -413,8 +410,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod state, containerManager); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator)); // start triggers a request cpm.start(); @@ -467,7 +463,7 @@ private void testContainerRequestedRetriesExceedingWindowOnFailureWithUnknownCod assertEquals(false, cpm.getJobFailureCriteriaMet()); assertEquals(1, cpm.getProcessorFailures().get(processorId).getCount()); - cpm.stop(); + } @Test @@ -509,8 +505,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown state, containerManager); - ContainerProcessManager cpm = - buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager); + cpm = buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator), mockLocalityManager); // start triggers a request cpm.start(); @@ -598,7 +593,7 @@ private void testContainerRequestedRetriesNotExceedingWindowOnFailureWithUnknown assertEquals(0, allocator.getContainerRequestState().numPendingRequests()); assertEquals(0, allocator.getContainerRequestState().numDelayedRequests()); - cpm.stop(); + } @Test @@ -619,8 +614,7 @@ public void testInvalidNotificationsAreIgnored() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // Start the task clusterResourceManager cpm.start(); @@ -700,8 +694,7 @@ public void testAllBufferedResourcesAreUtilized() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager)); + cpm = spy(buildContainerProcessManager(new ClusterManagerConfig(cfg), state, clusterResourceManager, Optional.of(allocator), mockLocalityManager)); cpm.start(); assertFalse(cpm.shouldShutdown()); @@ -765,8 +758,7 @@ public void testDuplicateNotificationsDoNotAffectJobHealth() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // Start the task manager cpm.start(); @@ -840,8 +832,7 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { state, containerManager); - ContainerProcessManager cpm = - spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); + cpm = spy(buildContainerProcessManager(clusterManagerConfig, state, clusterResourceManager, Optional.of(allocator))); // Start the task clusterResourceManager cpm.start(); @@ -913,13 +904,16 @@ public void testNewContainerRequestedOnFailureWithKnownCode() throws Exception { assertFalse(cpm.shouldShutdown()); assertFalse(state.jobHealthy.get()); assertEquals(ResourceRequestState.ANY_HOST, allocator.getContainerRequestState().peekPendingRequest().getPreferredHost()); - - cpm.stop(); } @After public void teardown() { + if (cpm != null) { + cpm.stop(); + } server.stop(); + containerPlacementMetadataStore.stop(); + coordinatorStreamStore.close(); } private ContainerManager buildContainerManager(ContainerPlacementMetadataStore containerPlacementMetadataStore,