From 7b58d854edb27dea816e55287a92e5ea73dea661 Mon Sep 17 00:00:00 2001 From: mekya Date: Sun, 24 Nov 2024 12:37:14 +0300 Subject: [PATCH 1/6] Optimize reconnect-faster if the origin instance is terminated --- .../antmedia/AntMediaApplicationAdapter.java | 73 ++++++++++++++----- .../AcceptOnlyStreamsInDataStore.java | 9 ++- .../streamsource/StreamFetcherManager.java | 7 +- .../AntMediaApplicationAdaptorUnitTest.java | 12 +++ 4 files changed, 81 insertions(+), 20 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 54054b739..e9b0e5f55 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -6,6 +6,12 @@ import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -26,6 +32,7 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpStatus; @@ -98,7 +105,6 @@ import io.antmedia.webrtc.api.IWebRTCAdaptor; import io.antmedia.webrtc.api.IWebRTCClient; import io.antmedia.websocket.WebSocketConstants; -import io.micrometer.common.util.StringUtils; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.ext.dropwizard.MetricsService; @@ -635,8 +641,20 @@ public void closeBroadcast(String streamId) { Broadcast broadcast = getDataStore().get(streamId); if (broadcast != null) { - getDataStore().updateStatus(streamId, BROADCAST_STATUS_FINISHED); + if (broadcast.isZombi()) { + logger.info("Deleting streamId:{} because it's a zombi stream", streamId); + getDataStore().delete(streamId); + } + else { + + getDataStore().updateStatus(streamId, BROADCAST_STATUS_FINISHED); + // This is resets Viewer map in HLS Viewer Stats + resetHLSStats(streamId); + + // This is resets Viewer map in DASH Viewer Stats + resetDASHStats(streamId); + } final String listenerHookURL = getListenerHookURL(broadcast); if (listenerHookURL != null && !listenerHookURL.isEmpty()) { @@ -658,20 +676,7 @@ public void closeBroadcast(String streamId) { if(StringUtils.isNotBlank(broadcast.getMainTrackStreamId())) { updateMainTrackWithRecentlyFinishedBroadcast(broadcast); - } - - if (broadcast.isZombi()) { - - logger.info("Deleting streamId:{} because it's a zombi stream", streamId); - getDataStore().delete(streamId); - } - else { - // This is resets Viewer map in HLS Viewer Stats - resetHLSStats(streamId); - - // This is resets Viewer map in DASH Viewer Stats - resetDASHStats(streamId); - } + } for (IStreamListener listener : streamListeners) { listener.streamFinished(broadcast.getStreamId()); @@ -683,6 +688,38 @@ public void closeBroadcast(String streamId) { } } + public static boolean isInstanceAlive(String originAdress, String hostAddress, int httpPort, String appName) { + if (StringUtils.isBlank(originAdress) || StringUtils.equals(originAdress, hostAddress)) { + return true; + } + + String url = "http://" + originAdress + ":" + httpPort + "/" + appName; + + boolean result = isEndpointReachable(url); + if (!result) { + logger.warn("Instance with origin address {} is not reachable through its app:{}", originAdress, appName); + } + return false; + } + + public static boolean isEndpointReachable(String endpoint) { + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = HttpRequest.newBuilder() + .uri(URI.create(endpoint)) + .method("HEAD", HttpRequest.BodyPublishers.noBody()) // HEAD request + .timeout(java.time.Duration.ofSeconds(1)) + .build(); + + try { + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.discarding()); + return true; + } + catch (Exception e) { + logger.error("Enpoint is not reachable: {}, {}", endpoint, ExceptionUtils.getStackTrace(e)); + return false; + } + } + /** * If multiple threads enter the method at the same time, the following method does not work correctly. * So we have made it synchronized @@ -1472,13 +1509,13 @@ public void closeStreamFetchers() { public void waitUntilLiveStreamsStopped() { int i = 0; - int waitPeriod = 1000; + int waitPeriod = 500; boolean everythingHasStopped = true; while(getDataStore().getLocalLiveBroadcastCount(getServerSettings().getHostAddress()) > 0) { try { if (i > 3) { logger.warn("Waiting for active broadcasts number decrease to zero for app: {}" - + "total wait time: {}ms", getScope().getName(), i*waitPeriod); + + " total wait time: {}ms", getScope().getName(), i*waitPeriod); } if (i>10) { logger.error("Not all live streams're stopped gracefully. It will update the streams' status to finished explicitly"); diff --git a/src/main/java/io/antmedia/security/AcceptOnlyStreamsInDataStore.java b/src/main/java/io/antmedia/security/AcceptOnlyStreamsInDataStore.java index d5929ffd5..fb1169826 100644 --- a/src/main/java/io/antmedia/security/AcceptOnlyStreamsInDataStore.java +++ b/src/main/java/io/antmedia/security/AcceptOnlyStreamsInDataStore.java @@ -59,7 +59,10 @@ public boolean isPublishAllowed(IScope scope, String name, String mode, Map shuttingDown()); } @@ -119,7 +123,8 @@ public boolean isStreamRunning(Broadcast broadcast) { if (!isStreamLive) { //this stream may be fetching in somewhere in the cluster - isStreamLive = AntMediaApplicationAdapter.isStreaming(broadcast); + isStreamLive = AntMediaApplicationAdapter.isStreaming(broadcast) && + AntMediaApplicationAdapter.isInstanceAlive(broadcast.getOriginAdress(), serverSettings.getHostAddress(), serverSettings.getDefaultHttpPort(), scope.getName()); } return isStreamLive; diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index e221d82a2..6ae493844 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -228,6 +228,18 @@ public void testFirebase() throws IOException, FirebaseMessagingException { } } + + @Test + public void testEndpointReachable() { + boolean endpointReachable = AntMediaApplicationAdapter.isEndpointReachable("http://antmedia.io/not_exist"); + //it should be true because we're just checking if it's reachable + assertTrue(endpointReachable); + + endpointReachable = AntMediaApplicationAdapter.isEndpointReachable("http://antmedia.io:45454/not_exist"); + assertFalse(endpointReachable); + + } + @Test public void testIsIncomingTimeValid() { AppSettings newSettings = new AppSettings(); From 39e5e3d8d80d4f3d3088ed61b82a52cda31c9f50 Mon Sep 17 00:00:00 2001 From: mekya Date: Sun, 24 Nov 2024 12:38:48 +0300 Subject: [PATCH 2/6] Don't accept broadcast without origin as local It delays the shutdown process and reconnect failures --- src/main/java/io/antmedia/datastore/db/DataStore.java | 4 ++-- src/main/java/io/antmedia/datastore/db/MongoStore.java | 8 ++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/antmedia/datastore/db/DataStore.java b/src/main/java/io/antmedia/datastore/db/DataStore.java index 5e74b294c..625b8f347 100644 --- a/src/main/java/io/antmedia/datastore/db/DataStore.java +++ b/src/main/java/io/antmedia/datastore/db/DataStore.java @@ -668,7 +668,7 @@ public long getActiveBroadcastCount(Map broadcastMap, Gson gson, Broadcast broadcast = gson.fromJson(broadcastString, Broadcast.class); String status = broadcast.getStatus(); if (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(status) && - (StringUtils.isAnyBlank(hostAddress, broadcast.getOriginAdress()) || hostAddress.equals(broadcast.getOriginAdress()))) + (StringUtils.isBlank(hostAddress) || hostAddress.equals(broadcast.getOriginAdress()))) { activeBroadcastCount++; } @@ -688,7 +688,7 @@ public List getActiveBroadcastList(Map broadcastMap, String status = broadcast.getStatus(); if (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(status) && - (StringUtils.isAnyBlank(hostAddress, broadcast.getOriginAdress()) || hostAddress.equals(broadcast.getOriginAdress()))) + (StringUtils.isBlank(hostAddress) || hostAddress.equals(broadcast.getOriginAdress()))) { broadcastList.add(broadcast); } diff --git a/src/main/java/io/antmedia/datastore/db/MongoStore.java b/src/main/java/io/antmedia/datastore/db/MongoStore.java index 813c732c7..bce50c94f 100644 --- a/src/main/java/io/antmedia/datastore/db/MongoStore.java +++ b/src/main/java/io/antmedia/datastore/db/MongoStore.java @@ -1368,9 +1368,7 @@ public long getLocalLiveBroadcastCount(String hostAddress) { return datastore.find(Broadcast.class) .filter(Filters.and( Filters.or( - Filters.eq(ORIGIN_ADDRESS, hostAddress), - Filters.exists(ORIGIN_ADDRESS).not() - ), + Filters.eq(ORIGIN_ADDRESS, hostAddress) ), Filters.eq(STATUS, IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING) )).count(); } @@ -1383,9 +1381,7 @@ public List getLocalLiveBroadcasts(String hostAddress) return datastore.find(Broadcast.class) .filter(Filters.and( Filters.or( - Filters.eq(ORIGIN_ADDRESS, hostAddress), - Filters.exists(ORIGIN_ADDRESS).not() - ), + Filters.eq(ORIGIN_ADDRESS, hostAddress) ), Filters.eq(STATUS, IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING) )).iterator().toList(); } From d607be7531f5aedd8813ff448ed5f0409470dcac Mon Sep 17 00:00:00 2001 From: mekya Date: Sun, 24 Nov 2024 18:35:44 +0300 Subject: [PATCH 3/6] Fix test cases and add new test cases --- .../AntMediaApplicationAdaptorUnitTest.java | 4 +- .../io/antmedia/test/db/DBStoresUnitTest.java | 62 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 6ae493844..9b647176b 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -96,6 +96,7 @@ import io.antmedia.rest.model.Result; import io.antmedia.security.AcceptOnlyStreamsInDataStore; import io.antmedia.settings.ServerSettings; +import io.antmedia.statistic.HlsViewerStats; import io.antmedia.statistic.type.WebRTCAudioReceiveStats; import io.antmedia.statistic.type.WebRTCAudioSendStats; import io.antmedia.statistic.type.WebRTCVideoReceiveStats; @@ -827,8 +828,9 @@ public void testHookAfterDefined() assertEquals(hookURL, spyAdaptor.getListenerHookURL(broadcast)); - spyAdaptor = Mockito.spy(adapter); + Mockito.doNothing().when(spyAdaptor).resetHLSStats(Mockito.anyString()); + Mockito.doNothing().when(spyAdaptor).resetDASHStats(Mockito.anyString()); appSettings = new AppSettings(); spyAdaptor.setServerSettings(new ServerSettings()); spyAdaptor.setAppSettings(appSettings); diff --git a/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java b/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java index 6f69edcdf..3194c7dc6 100644 --- a/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java +++ b/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java @@ -141,7 +141,7 @@ public void testMapDBStore() throws Exception { DataStore dataStore = new MapDBStore("testdb", vertx); - + testLocalLiveBroadcast(dataStore); testUpdateBroadcastEncoderSettings(dataStore); testSubscriberMetaData(dataStore); testGetActiveBroadcastCount(dataStore); @@ -231,6 +231,7 @@ public void testMemoryDataStore() throws Exception { testVoDFunctions(dataStore); + testLocalLiveBroadcast(dataStore); testUpdateBroadcastEncoderSettings(dataStore); testSubscriberMetaData(dataStore); testBlockSubscriber(dataStore); @@ -293,8 +294,10 @@ public void testMongoStore() throws Exception { dataStore = new MongoStore("127.0.0.1", "", "", "testdb"); + testSaveDuplicateStreamId((MongoStore)dataStore); + testLocalLiveBroadcast(dataStore); testUpdateBroadcastEncoderSettings(dataStore); testSubscriberMetaData(dataStore); testBlockSubscriber(dataStore); @@ -360,6 +363,7 @@ public void testRedisStore() throws Exception { dataStore.close(true); dataStore = new RedisStore("redis://127.0.0.1:6379", "testdb"); + testLocalLiveBroadcast(dataStore); testUpdateBroadcastEncoderSettings(dataStore); testSubscriberMetaData(dataStore); testBlockSubscriber(dataStore); @@ -644,6 +648,58 @@ public void testUnexpectedBroadcastOffset(DataStore dataStore) { assertNotNull(broadcastList); assertEquals(0, broadcastList.size()); } + + public void testLocalLiveBroadcast(DataStore dataStore) { + clear(dataStore); + + assertEquals(0, dataStore.getBroadcastCount()); + + long streamCount = 10; + String streamId = null; + for (int i = 0; i < streamCount; i++) { + Broadcast broadcast = new Broadcast(null, null); + broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING); + broadcast.setUpdateTime(System.currentTimeMillis()); + streamId = dataStore.save(broadcast); + logger.info("Saved streamId:{}", streamId); + } + + + assertEquals(streamCount, dataStore.getActiveBroadcastCount()); + + if (dataStore instanceof MapDBStore || dataStore instanceof InMemoryDataStore) { + assertEquals(streamCount, dataStore.getLocalLiveBroadcastCount(ServerSettings.getLocalHostAddress())); + + List localLiveBroadcasts = dataStore.getLocalLiveBroadcasts(ServerSettings.getLocalHostAddress()); + assertEquals(streamCount, localLiveBroadcasts.size()); + } else { + + //because there is no origin address registered + assertEquals(0, dataStore.getLocalLiveBroadcastCount(ServerSettings.getLocalHostAddress())); + List localLiveBroadcasts = dataStore.getLocalLiveBroadcasts(ServerSettings.getLocalHostAddress()); + assertEquals(0, localLiveBroadcasts.size()); + } + + clear(dataStore); + + assertEquals(0, dataStore.getBroadcastCount()); + + streamCount = 15; + for (int i = 0; i < streamCount; i++) { + Broadcast broadcast = new Broadcast(null, null); + broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING); + broadcast.setUpdateTime(System.currentTimeMillis()); + broadcast.setOriginAdress(ServerSettings.getLocalHostAddress()); + streamId = dataStore.save(broadcast); + logger.info("Saved streamId:{}", streamId); + } + + assertEquals(streamCount, dataStore.getLocalLiveBroadcastCount(ServerSettings.getLocalHostAddress())); + + List localLiveBroadcasts = dataStore.getLocalLiveBroadcasts(ServerSettings.getLocalHostAddress()); + assertEquals(streamCount, localLiveBroadcasts.size()); + + } public void testGetActiveBroadcastCount(DataStore dataStore) { @@ -661,7 +717,9 @@ public void testGetActiveBroadcastCount(DataStore dataStore) { String streamId = null; for (int i = 0; i < streamCount; i++) { - streamId = dataStore.save(new Broadcast(null, null)); + Broadcast broadcast = new Broadcast(null, null); + broadcast.setOriginAdress(ServerSettings.getLocalHostAddress()); + streamId = dataStore.save(broadcast); logger.info("Saved streamId:{}", streamId); } From 6f6330e036109ee174ed45a472a347ee0fd6e7a7 Mon Sep 17 00:00:00 2001 From: mekya Date: Mon, 25 Nov 2024 08:50:22 +0300 Subject: [PATCH 4/6] Increase test coverage --- .../antmedia/AntMediaApplicationAdapter.java | 2 +- .../AntMediaApplicationAdaptorUnitTest.java | 12 +++++++ .../test/StreamSchedularUnitTest.java | 31 +++++++++++++++++++ .../AcceptOnlyStreamsInDataStoreTest.java | 11 +++++++ 4 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index e9b0e5f55..40aa99a64 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -699,7 +699,7 @@ public static boolean isInstanceAlive(String originAdress, String hostAddress, i if (!result) { logger.warn("Instance with origin address {} is not reachable through its app:{}", originAdress, appName); } - return false; + return result; } public static boolean isEndpointReachable(String endpoint) { diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 9b647176b..2d712c29c 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -238,6 +238,18 @@ public void testEndpointReachable() { endpointReachable = AntMediaApplicationAdapter.isEndpointReachable("http://antmedia.io:45454/not_exist"); assertFalse(endpointReachable); + + boolean instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("antmedia.io", null, 80, ""); + assertTrue(instanceAlive); + + instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("antmedia.io", null, 4545, ""); + assertFalse(instanceAlive); + + instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("", null, 4545, ""); + assertTrue(instanceAlive); + + instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("localhost", "localhost", 4545, ""); + assertTrue(instanceAlive); } diff --git a/src/test/java/io/antmedia/test/StreamSchedularUnitTest.java b/src/test/java/io/antmedia/test/StreamSchedularUnitTest.java index 0b11de3cd..d919170ba 100644 --- a/src/test/java/io/antmedia/test/StreamSchedularUnitTest.java +++ b/src/test/java/io/antmedia/test/StreamSchedularUnitTest.java @@ -57,6 +57,7 @@ import io.antmedia.FFmpegUtilities; import io.antmedia.datastore.db.DataStore; import io.antmedia.datastore.db.IDataStoreFactory; +import io.antmedia.datastore.db.InMemoryDataStore; import io.antmedia.datastore.db.MapDBStore; import io.antmedia.datastore.db.types.Broadcast; import io.antmedia.datastore.db.types.Broadcast.PlayListItem; @@ -701,6 +702,36 @@ public void testSkipPlaylistItem() { fail(e.getMessage()); } } + + @Test + public void testIsStreamRunning() + { + DataStore dataStore = new InMemoryDataStore("test"); + StreamFetcherManager streamFetcherManager = Mockito.spy(new StreamFetcherManager(vertx, dataStore, appScope)); + + Broadcast broadcast = new Broadcast(); + + dataStore.save(broadcast); + + boolean isStreamRunning = streamFetcherManager.isStreamRunning(broadcast); + assertFalse(isStreamRunning); + + broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING); + broadcast.setUpdateTime(System.currentTimeMillis()); + + isStreamRunning = streamFetcherManager.isStreamRunning(broadcast); + assertTrue(isStreamRunning); + + broadcast.setOriginAdress("not.accessible.antmedia.io"); + isStreamRunning = streamFetcherManager.isStreamRunning(broadcast); + assertFalse(isStreamRunning); + + broadcast.setUpdateTime(0); + + isStreamRunning = streamFetcherManager.isStreamRunning(broadcast); + assertFalse(isStreamRunning); + + } @Test public void testControlStreamFetchersPlayListAndRestart() { diff --git a/src/test/java/io/antmedia/test/security/AcceptOnlyStreamsInDataStoreTest.java b/src/test/java/io/antmedia/test/security/AcceptOnlyStreamsInDataStoreTest.java index 1561eea19..facec7eb3 100644 --- a/src/test/java/io/antmedia/test/security/AcceptOnlyStreamsInDataStoreTest.java +++ b/src/test/java/io/antmedia/test/security/AcceptOnlyStreamsInDataStoreTest.java @@ -131,6 +131,17 @@ public void testStreamIdInUseCase() assertFalse(filter.isPublishAllowed(scope, preparingBroadcast.getStreamId(), "mode", null, null)); assertFalse(filter.isPublishAllowed(scope, broadcastingBroadcast.getStreamId(), "mode", null, null)); + + //change origin adress that is something not accesible + broadcastingBroadcast.setOriginAdress("not.exist.antmedia.io"); + //it shoudla allow to publish because it's not accessible + assertTrue(filter.isPublishAllowed(scope, broadcastingBroadcast.getStreamId(), "mode", null, null)); + //change origin adress to empty + broadcastingBroadcast.setOriginAdress(""); + //it shoudl not allow to publish because timeout is not passed + assertFalse(filter.isPublishAllowed(scope, broadcastingBroadcast.getStreamId(), "mode", null, null)); + + assertTrue(filter.isPublishAllowed(scope, offlineBroadcast.getStreamId(), "mode", null, null)); assertTrue(filter.isPublishAllowed(scope, stuckedBroadcast.getStreamId(), "mode", null, null)); From 93e9efcf64c4ce085116e04957a26dd1e51cf01b Mon Sep 17 00:00:00 2001 From: mekya Date: Mon, 25 Nov 2024 10:08:08 +0300 Subject: [PATCH 5/6] Refactor code to call the same method to create mainTrack broadcast --- .../io/antmedia/AntMediaApplicationAdapter.java | 17 +++++++++++++++++ src/main/java/io/antmedia/muxer/MuxAdaptor.java | 11 +---------- .../AntMediaApplicationAdaptorUnitTest.java | 10 ++++++++++ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 40aa99a64..3fa7865f6 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -1,5 +1,6 @@ package io.antmedia; +import static io.antmedia.muxer.IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING; import static org.bytedeco.ffmpeg.global.avcodec.avcodec_get_name; import java.io.File; @@ -687,6 +688,22 @@ public void closeBroadcast(String streamId) { logger.error(ExceptionUtils.getStackTrace(e)); } } + + public static Broadcast saveMainBroadcast(String streamId, String mainTrackId, DataStore dataStore) { + Broadcast mainBroadcast = new Broadcast(); + try { + mainBroadcast.setStreamId(mainTrackId); + } catch (Exception e) { + logger.error(ExceptionUtils.getStackTrace(e)); + } + mainBroadcast.setZombi(true); + mainBroadcast.setStatus(BROADCAST_STATUS_BROADCASTING); + mainBroadcast.getSubTrackStreamIds().add(streamId); + // don't set setOriginAdress because it's not a real stream and it causes extra delay -> mainBroadcast.setOriginAdress(serverSettings.getHostAddress()) + mainBroadcast.setStartTime(System.currentTimeMillis()); + + return StringUtils.isNotBlank(dataStore.save(mainBroadcast)) ? mainBroadcast : null; + } public static boolean isInstanceAlive(String originAdress, String hostAddress, int httpPort, String appName) { if (StringUtils.isBlank(originAdress) || StringUtils.equals(originAdress, hostAddress)) { diff --git a/src/main/java/io/antmedia/muxer/MuxAdaptor.java b/src/main/java/io/antmedia/muxer/MuxAdaptor.java index 3ec6fc52b..6fae9f3c7 100644 --- a/src/main/java/io/antmedia/muxer/MuxAdaptor.java +++ b/src/main/java/io/antmedia/muxer/MuxAdaptor.java @@ -814,16 +814,7 @@ public void registerToMainTrackIfExists() { Broadcast mainBroadcast = getDataStore().get(mainTrack); if(mainBroadcast == null) { - mainBroadcast = new Broadcast(); - try { - mainBroadcast.setStreamId(mainTrack); - } catch (Exception e) { - logger.error(ExceptionUtils.getStackTrace(e)); - } - mainBroadcast.setZombi(true); - mainBroadcast.setStatus(BROADCAST_STATUS_BROADCASTING); - mainBroadcast.getSubTrackStreamIds().add(streamId); - getDataStore().save(mainBroadcast); + mainBroadcast = AntMediaApplicationAdapter.saveMainBroadcast(streamId, mainTrack, getDataStore()); } else { diff --git a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java index 2d712c29c..6a226a5e6 100644 --- a/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java +++ b/src/test/java/io/antmedia/test/AntMediaApplicationAdaptorUnitTest.java @@ -2125,6 +2125,16 @@ public void testAddRemovePacketListener() { } + + @Test + public void testSaveMainBroadcast() + { + DataStore dataStore = new InMemoryDataStore("test"); + Broadcast mainTrack = AntMediaApplicationAdapter.saveMainBroadcast("streamId", "mainTrackId", dataStore); + assertNotNull(mainTrack); + //origin address must be null because main track is not a real stream + assertNull(mainTrack.getOriginAdress()); + } @Test public void testAppDeletion() From 932f45cb29a2cc8678aa4a10f4b032faf3869890 Mon Sep 17 00:00:00 2001 From: mekya Date: Mon, 25 Nov 2024 10:11:50 +0300 Subject: [PATCH 6/6] Fix reliability issue in sonar --- .../io/antmedia/AntMediaApplicationAdapter.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java index 3fa7865f6..c939be4a6 100644 --- a/src/main/java/io/antmedia/AntMediaApplicationAdapter.java +++ b/src/main/java/io/antmedia/AntMediaApplicationAdapter.java @@ -688,7 +688,7 @@ public void closeBroadcast(String streamId) { logger.error(ExceptionUtils.getStackTrace(e)); } } - + public static Broadcast saveMainBroadcast(String streamId, String mainTrackId, DataStore dataStore) { Broadcast mainBroadcast = new Broadcast(); try { @@ -727,14 +727,20 @@ public static boolean isEndpointReachable(String endpoint) { .timeout(java.time.Duration.ofSeconds(1)) .build(); + try { HttpResponse response = client.send(request, HttpResponse.BodyHandlers.discarding()); - return true; - } - catch (Exception e) { + } catch (InterruptedException e) { + logger.error("InterruptedException Enpoint is not reachable: {}, {}", endpoint, ExceptionUtils.getStackTrace(e)); + Thread.currentThread().interrupt(); + return false; + } catch (Exception e) { logger.error("Enpoint is not reachable: {}, {}", endpoint, ExceptionUtils.getStackTrace(e)); return false; } + return true; + + } /**