diff --git a/src/main/java/io/antmedia/SystemUtils.java b/src/main/java/io/antmedia/SystemUtils.java index f27dbb57c..57087046c 100644 --- a/src/main/java/io/antmedia/SystemUtils.java +++ b/src/main/java/io/antmedia/SystemUtils.java @@ -1,13 +1,20 @@ package io.antmedia; +import java.io.BufferedReader; import java.io.File; +import java.io.FileReader; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.OperatingSystemMXBean; import java.lang.reflect.Method; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; import javax.management.MBeanServer; +import org.apache.commons.lang3.StringUtils; import org.bytedeco.javacpp.Pointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +69,8 @@ public class SystemUtils { public static final String HEAPDUMP_HPROF = "heapdump.hprof"; + public static final long MAX_CONTAINER_MEMORY_LIMIT_BYTES = 109951162777600L; //100TB + public static final String MAX_MEMORY_CGROUP_V2 = "max"; /** * Obtain Operating System's name. @@ -104,6 +113,8 @@ public class SystemUtils { public static final int WINDOWS = 2; public static final int OS_TYPE; + + public static Boolean containerized = null; static { String osName = SystemUtils.osName.toLowerCase(); @@ -227,6 +238,25 @@ public static long osCommittedVirtualMemory() { * @return bytes size */ public static long osTotalPhysicalMemory() { + if(containerized == null){ + containerized = isContainerized(); + } + + if(containerized) { + try{ + return getMemoryLimitFromCgroup(); + + }catch (IOException e) { + logger.debug("Could not get memory limit from c group. {}", e.getMessage()); + } + } + + return getTotalPhysicalMemorySize(); + + } + + public static long getTotalPhysicalMemorySize(){ + final OperatingSystemMXBean osBean = ManagementFactory.getOperatingSystemMXBean(); if (osBean instanceof UnixOperatingSystemMXBean) { return (((UnixOperatingSystemMXBean) osBean).getTotalPhysicalMemorySize()); @@ -240,8 +270,10 @@ public static long osTotalPhysicalMemory() { return -1L; } } + } + /** * Obtain Free Physical Memory from Operating System's RAM. * @@ -273,7 +305,21 @@ public static long osFreePhysicalMemory() { * @return the amount of available physical memory */ public static long osAvailableMemory() { - return Pointer.availablePhysicalBytes(); + if(containerized == null){ + containerized = isContainerized(); + } + + if(containerized) { + try{ + return getMemAvailableFromCgroup(); + + }catch (IOException e) { + logger.debug("Could not get mem available from cgroup. Will return os free physical memory instead."); + return osFreePhysicalMemory(); + } + } + + return availablePhysicalBytes(); } /** @@ -698,7 +744,6 @@ public static void getHeapDump(String filepath) { } } - private static HotSpotDiagnosticMXBean getHotspotMBean() { try { synchronized (SystemUtils.class) { @@ -715,5 +760,118 @@ private static HotSpotDiagnosticMXBean getHotspotMBean() { } return hotspotMBean; } + + public static long availablePhysicalBytes(){ + return Pointer.availablePhysicalBytes(); + } + + public static boolean isContainerized() { + try { + Path dockerEnvPath = Paths.get("/.dockerenv"); + + // 1. Check for .dockerenv file + if (Files.exists(dockerEnvPath)) { + logger.debug("Container detected via .dockerenv file"); + return true; + } + + // 2. Check env variable + String container = System.getenv("container"); + if(StringUtils.isNotBlank(container)){ + logger.debug("Container detected via env variable."); + return true; + } + + // 3. Check cgroup info + Path cgroupPath = Paths.get("/proc/self/cgroup"); + if (Files.exists(cgroupPath)) { + List cgroupContent = Files.readAllLines(cgroupPath); + for (String line : cgroupContent) { + if (line.contains("docker") || + line.contains("lxc") || + line.contains("kubepods") || + line.contains("containerd")) { + logger.debug("Container detected via cgroup: {}", line); + return true; + } + } + } + + } catch (Exception e) { + logger.error("Error during container detection: {}", e.getMessage()); + + } + + return false; + } + + public static Long getMemoryLimitFromCgroup() throws IOException { + long memoryLimit; + + // Try reading memory limit for cgroups v1 + if (Files.exists(Paths.get("/sys/fs/cgroup/memory/memory.limit_in_bytes"))) { + String memoryLimitString = readCgroupFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); + + memoryLimit = Long.parseLong(memoryLimitString); + + // In cgroups v1, if the memory limit for a container isn't set, it typically returns 9223372036854771712 (2^63-1). + // However, this value may vary based on the architecture. + // Therefore, we consider it acceptable if the returned memory limit exceeds 100TB, indicating that the limit is not configured using cgroups. + if(memoryLimit > MAX_CONTAINER_MEMORY_LIMIT_BYTES || memoryLimit == 0 || memoryLimit == -1) { + memoryLimit = getTotalPhysicalMemorySize(); + } + + // Try reading memory limit for cgroups v2 + } else if (Files.exists(Paths.get("/sys/fs/cgroup/memory.max"))) { + String memoryLimitString = readCgroupFile("/sys/fs/cgroup/memory.max"); + + if(MAX_MEMORY_CGROUP_V2.equals(memoryLimitString)){ // memory limit is not set using cgroups v2 + memoryLimit = getTotalPhysicalMemorySize(); + }else{ + memoryLimit = Long.parseLong(memoryLimitString); + } + + + } else { + logger.debug("Could not find cgroup max memory file. Will return os physical memory instead."); + return getTotalPhysicalMemorySize(); + } + + return memoryLimit; + } + + public static Long getMemAvailableFromCgroup() throws IOException { + Long memoryUsage; + Long memoryLimit; + + // Try reading memory usage and limit for cgroups v1 + if (Files.exists(Paths.get("/sys/fs/cgroup/memory/memory.usage_in_bytes")) && + Files.exists(Paths.get("/sys/fs/cgroup/memory/memory.limit_in_bytes"))) { + String memoryUsageString = readCgroupFile("/sys/fs/cgroup/memory/memory.usage_in_bytes"); + memoryUsage = Long.parseLong(memoryUsageString); + memoryLimit = getMemoryLimitFromCgroup(); + + // Try reading memory usage and limit for cgroups v2 + } else if (Files.exists(Paths.get("/sys/fs/cgroup/memory.current")) && + Files.exists(Paths.get("/sys/fs/cgroup/memory.max"))) { + + String memoryUsageString = readCgroupFile("/sys/fs/cgroup/memory.current"); + memoryUsage = Long.parseLong(memoryUsageString); + memoryLimit = getMemoryLimitFromCgroup(); + + } else { + logger.debug("Could not find cgroup memory files. Will return os free physical memory instead."); + return osFreePhysicalMemory(); + } + + // Calculate available memory + return memoryLimit - memoryUsage; + } + + public static String readCgroupFile(String filePath) throws IOException { + try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) { + return reader.readLine().trim(); + } + } } diff --git a/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java b/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java index 4e2abbe50..34d437be8 100644 --- a/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java +++ b/src/test/java/io/antmedia/test/rest/BroadcastRestServiceV2UnitTest.java @@ -2150,7 +2150,7 @@ public void testAddIPCamera() { streamSourceRest.setAppCtx(appContext); - StatsCollector monitorService = new StatsCollector(); + StatsCollector monitorService = spy(StatsCollector.class); when(appContext.getBean(IStatsCollector.BEAN_NAME)).thenReturn(monitorService); @@ -2181,6 +2181,8 @@ public void testAddIPCamera() { int cpuLoad2 = 70; int cpuLimit2 = 80; + when(monitorService.getMemoryLoad()).thenReturn(20); + monitorService.setCpuLimit(cpuLimit2); monitorService.setCpuLoad(cpuLoad2); diff --git a/src/test/java/io/antmedia/test/statistic/StatsCollectorTest.java b/src/test/java/io/antmedia/test/statistic/StatsCollectorTest.java index 4ee9c391a..72c891782 100644 --- a/src/test/java/io/antmedia/test/statistic/StatsCollectorTest.java +++ b/src/test/java/io/antmedia/test/statistic/StatsCollectorTest.java @@ -7,15 +7,15 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -25,10 +25,12 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.awaitility.Awaitility; import org.bytedeco.ffmpeg.avutil.AVRational; +import org.bytedeco.javacpp.Pointer; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.red5.server.Launcher; import org.red5.server.api.IContext; @@ -348,7 +350,7 @@ public void testHeartbeat() { return true; }); - Mockito.verify(resMonitor, Mockito.times(1)).startAnalytic(); + Mockito.verify(resMonitor, times(1)).startAnalytic(); resMonitor.cancelHeartBeat(); @@ -357,7 +359,7 @@ public void testHeartbeat() { resMonitor.setHeartBeatEnabled(false); resMonitor.start(); assertFalse(resMonitor.isHeartBeatEnabled()); - Mockito.verify(resMonitor, Mockito.times(1)).startAnalytic(); + Mockito.verify(resMonitor, times(1)).startAnalytic(); resMonitor.cancelHeartBeat(); @@ -481,7 +483,7 @@ public void testCollectAndSendWebRTCStats() { resMonitor.collectAndSendWebRTCClientsStats(); - verify(kafkaProducer, Mockito.times(1)).send(Mockito.any()); + verify(kafkaProducer, times(1)).send(Mockito.any()); } @@ -712,5 +714,249 @@ public void testGetAppAdaptor() } - + + @Test + public void testOsAvailableMemory() { + long availableMemory = 1024; + long containerizedMemory = 2048; + try (MockedStatic mockedSystemUtils = mockStatic(SystemUtils.class)) { + // Set up base mocking to allow real method call + mockedSystemUtils.when(SystemUtils::osAvailableMemory) + .thenCallRealMethod(); + + // Test non-containerized scenario + mockedSystemUtils.when(SystemUtils::isContainerized).thenReturn(false); + mockedSystemUtils.when(SystemUtils::availablePhysicalBytes).thenReturn(availableMemory); + + long nonContainerizedResult = SystemUtils.osAvailableMemory(); + assertEquals(availableMemory, nonContainerizedResult); + + // Reset the state for containerized scenario + SystemUtils.containerized = null; + + // Test containerized scenario + mockedSystemUtils.when(SystemUtils::isContainerized).thenReturn(true); + mockedSystemUtils.when(SystemUtils::getMemAvailableFromCgroup).thenReturn(containerizedMemory); + + long containerizedResult = SystemUtils.osAvailableMemory(); + assertEquals(containerizedMemory, containerizedResult); + + // Verify all calls + mockedSystemUtils.verify(SystemUtils::isContainerized, times(2)); + mockedSystemUtils.verify(SystemUtils::availablePhysicalBytes, times(1)); + mockedSystemUtils.verify(SystemUtils::getMemAvailableFromCgroup, times(1)); + } + } + + @Test + public void testIsContainerized() { + Path mockDockerEnvPath = Path.of("/tmp/test/.dockerenv"); + Path mockCgroupPath = Path.of("/tmp/test/cgroup"); + + try (MockedStatic mockedFiles = mockStatic(Files.class); + MockedStatic mockedPaths = mockStatic(Paths.class); + ) { + + // Setup path mocks + mockedPaths.when(() -> Paths.get("/.dockerenv")).thenReturn(mockDockerEnvPath); + mockedPaths.when(() -> Paths.get("/proc/self/cgroup")).thenReturn(mockCgroupPath); + + // Test 1: Docker environment file exists + mockedFiles.when(() -> Files.exists(mockDockerEnvPath)).thenReturn(true); + assertTrue(SystemUtils.isContainerized()); + + + // Test 2: Docker in cgroup + mockedFiles.when(() -> Files.exists(mockDockerEnvPath)).thenReturn(false); + mockedFiles.when(() -> Files.exists(mockCgroupPath)).thenReturn(true); + mockedFiles.when(() -> Files.readAllLines(mockCgroupPath)) + .thenReturn(Collections.singletonList("12:memory:/docker/someId")); + assertTrue(SystemUtils.isContainerized()); + + // Test 3: LXC in cgroup + mockedFiles.when(() -> Files.readAllLines(mockCgroupPath)) + .thenReturn(Collections.singletonList("12:memory:/lxc/someId")); + assertTrue(SystemUtils.isContainerized()); + + // Test 4: Kubernetes in cgroup + mockedFiles.when(() -> Files.readAllLines(mockCgroupPath)) + .thenReturn(Collections.singletonList("12:memory:/kubepods/someId")); + assertTrue(SystemUtils.isContainerized()); + + // Test 5: Containerd in cgroup + mockedFiles.when(() -> Files.readAllLines(mockCgroupPath)) + .thenReturn(Collections.singletonList("12:memory:/containerd/someId")); + assertTrue(SystemUtils.isContainerized()); + + // Test 6: No container + mockedFiles.when(() -> Files.readAllLines(mockCgroupPath)) + .thenReturn(Collections.singletonList("12:memory:/user.slice")); + assertFalse(SystemUtils.isContainerized()); + + // Test 7: File access exception + mockedFiles.when(() -> Files.exists(mockDockerEnvPath)) + .thenThrow(new SecurityException("Access denied")); + assertFalse(SystemUtils.isContainerized()); + + // Test 8: Read exception + mockedFiles.when(() -> Files.exists(mockDockerEnvPath)).thenReturn(false); + mockedFiles.when(() -> Files.exists(mockCgroupPath)).thenReturn(true); + mockedFiles.when(() -> Files.readAllLines(mockCgroupPath)) + .thenThrow(new IOException("Read error")); + assertFalse(SystemUtils.isContainerized()); + } + + } + + @Test + public void testGetMemAvailableFromCgroup() throws IOException { + String cgroupV1UsagePath = "/sys/fs/cgroup/memory/memory.usage_in_bytes"; + String cgroupV1LimitPath = "/sys/fs/cgroup/memory/memory.limit_in_bytes"; + String cgroupV2UsagePath = "/sys/fs/cgroup/memory.current"; + String cgroupV2LimitPath = "/sys/fs/cgroup/memory.max"; + String expectedUsageStr = "5000"; + String expectedLimitStr = "10000"; + + long expectedOsFreeMemory = 8000L; + + // Test all three scenarios using nested try-with-resources + try (MockedStatic mockedFiles = mockStatic(Files.class); + MockedStatic mockedPaths = mockStatic(Paths.class)) { + + // Mock Path objects + Path cgroupV1UsagePathObj = mock(Path.class); + Path cgroupV1LimitPathObj = mock(Path.class); + Path cgroupV2UsagePathObj = mock(Path.class); + Path cgroupV2LimitPathObj = mock(Path.class); + + // Set up Paths.get() mocks + when(Paths.get(cgroupV1UsagePath)).thenReturn(cgroupV1UsagePathObj); + when(Paths.get(cgroupV1LimitPath)).thenReturn(cgroupV1LimitPathObj); + when(Paths.get(cgroupV2UsagePath)).thenReturn(cgroupV2UsagePathObj); + when(Paths.get(cgroupV2LimitPath)).thenReturn(cgroupV2LimitPathObj); + + // Test Scenario 1: cgroups v1 exists + mockedFiles.when(() -> Files.exists(cgroupV1UsagePathObj)).thenReturn(true); + mockedFiles.when(() -> Files.exists(cgroupV1LimitPathObj)).thenReturn(true); + + try (MockedStatic mockedMemoryUtils = mockStatic(SystemUtils.class, + CALLS_REAL_METHODS)) { + mockedMemoryUtils.when(() -> SystemUtils.readCgroupFile(cgroupV1UsagePath)) + .thenReturn(expectedUsageStr); + mockedMemoryUtils.when(() -> SystemUtils.readCgroupFile(cgroupV1LimitPath)) + .thenReturn(expectedLimitStr); + + long result = SystemUtils.getMemAvailableFromCgroup(); + assertEquals(Long.parseLong(expectedLimitStr) - Long.parseLong(expectedUsageStr), result); + } + + // Test Scenario 2: cgroups v2 exists (v1 doesn't exist) + mockedFiles.when(() -> Files.exists(cgroupV1UsagePathObj)).thenReturn(false); + mockedFiles.when(() -> Files.exists(cgroupV1LimitPathObj)).thenReturn(false); + mockedFiles.when(() -> Files.exists(cgroupV2UsagePathObj)).thenReturn(true); + mockedFiles.when(() -> Files.exists(cgroupV2LimitPathObj)).thenReturn(true); + + try (MockedStatic mockedMemoryUtils = mockStatic(SystemUtils.class, + CALLS_REAL_METHODS)) { + mockedMemoryUtils.when(() -> SystemUtils.readCgroupFile(cgroupV2UsagePath)) + .thenReturn(expectedUsageStr); + mockedMemoryUtils.when(() -> SystemUtils.readCgroupFile(cgroupV2LimitPath)) + .thenReturn(expectedLimitStr); + + long result = SystemUtils.getMemAvailableFromCgroup(); + assertEquals(Long.parseLong(expectedLimitStr) - Long.parseLong(expectedUsageStr), result); + } + + // Test Scenario 3: No cgroups exist, falls back to OS memory + mockedFiles.when(() -> Files.exists(cgroupV2UsagePathObj)).thenReturn(false); + mockedFiles.when(() -> Files.exists(cgroupV2LimitPathObj)).thenReturn(false); + + try (MockedStatic mockedMemoryUtils = mockStatic(SystemUtils.class, + CALLS_REAL_METHODS)) { + mockedMemoryUtils.when(SystemUtils::osFreePhysicalMemory) + .thenReturn(expectedOsFreeMemory); + + long result = SystemUtils.getMemAvailableFromCgroup(); + assertEquals(expectedOsFreeMemory, result); + } + } + } + + @Test + public void testOsTotalPhysicalMemory() throws IOException { + final long PHYSICAL_MEMORY_SIZE = 16_000_000_000L; // 16GB + + try (MockedStatic mockedFiles = mockStatic(Files.class); + MockedStatic mockedSystemUtils = mockStatic(SystemUtils.class, + CALLS_REAL_METHODS)) { + + // Test Case 1: Non-containerized environment + mockedSystemUtils.when(SystemUtils::isContainerized) + .thenReturn(false); + mockedSystemUtils.when(SystemUtils::getTotalPhysicalMemorySize) + .thenReturn(PHYSICAL_MEMORY_SIZE); + + // Call the real method for osTotalPhysicalMemory + mockedSystemUtils.when(SystemUtils::osTotalPhysicalMemory) + .thenCallRealMethod(); + + assertEquals(PHYSICAL_MEMORY_SIZE, SystemUtils.osTotalPhysicalMemory()); + + // Test Case 2: Containerized environment with cgroups v1 + mockedSystemUtils.when(SystemUtils::isContainerized) + .thenReturn(true); + + // Mock cgroups v1 path exists + mockedFiles.when(() -> Files.exists(Paths.get("/sys/fs/cgroup/memory/memory.limit_in_bytes"))) + .thenReturn(true); + mockedFiles.when(() -> Files.exists(Paths.get("/sys/fs/cgroup/memory.max"))) + .thenReturn(false); + + // Test 2a: Normal memory limit + long containerLimit = 8_000_000_000L; // 8GB + mockedSystemUtils.when(() -> SystemUtils.readCgroupFile(anyString())) + .thenReturn(String.valueOf(containerLimit)); + + assertEquals(containerLimit, SystemUtils.getMemoryLimitFromCgroup().longValue()); + + // Test 2b: Memory limit above MAX_CONTAINER_MEMORY_LIMIT_BYTES + mockedSystemUtils.when(() -> SystemUtils.readCgroupFile(anyString())) + .thenReturn(String.valueOf(SystemUtils.MAX_CONTAINER_MEMORY_LIMIT_BYTES + 1)); + + assertEquals(PHYSICAL_MEMORY_SIZE, SystemUtils.getMemoryLimitFromCgroup().longValue()); + + // Test Case 3: Containerized environment with cgroups v2 + mockedFiles.when(() -> Files.exists(Paths.get("/sys/fs/cgroup/memory/memory.limit_in_bytes"))) + .thenReturn(false); + mockedFiles.when(() -> Files.exists(Paths.get("/sys/fs/cgroup/memory.max"))) + .thenReturn(true); + + // Test 3a: Normal memory limit + mockedSystemUtils.when(() -> SystemUtils.readCgroupFile(anyString())) + .thenReturn("4000000000"); // 4GB + + assertEquals(4_000_000_000L, SystemUtils.getMemoryLimitFromCgroup().longValue()); + + // Test 3b: No limit set (max) + mockedSystemUtils.when(() -> SystemUtils.readCgroupFile(anyString())) + .thenReturn(SystemUtils.MAX_MEMORY_CGROUP_V2); + + assertEquals(PHYSICAL_MEMORY_SIZE, SystemUtils.getMemoryLimitFromCgroup().longValue()); + + // Test Case 4: No cgroup files exist + mockedFiles.when(() -> Files.exists(any(Path.class))) + .thenReturn(false); + + assertEquals(PHYSICAL_MEMORY_SIZE, SystemUtils.getMemoryLimitFromCgroup().longValue()); + + // Test Case 5: IOException handling + mockedSystemUtils.when(SystemUtils::isContainerized) + .thenReturn(true); + mockedSystemUtils.when(SystemUtils::getMemoryLimitFromCgroup) + .thenThrow(new IOException("Test exception")); + + assertEquals(PHYSICAL_MEMORY_SIZE, SystemUtils.osTotalPhysicalMemory()); + } + } + }