Skip to content

Commit

Permalink
Merge pull request #6828 from ant-media/reconnect-faster
Browse files Browse the repository at this point in the history
Reconnect faster and shutdown faster
  • Loading branch information
mekya authored Nov 25, 2024
2 parents e2688b3 + 932f45c commit a334a2b
Show file tree
Hide file tree
Showing 10 changed files with 236 additions and 41 deletions.
96 changes: 78 additions & 18 deletions src/main/java/io/antmedia/AntMediaApplicationAdapter.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package io.antmedia;

import static io.antmedia.muxer.IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING;
import static org.bytedeco.ffmpeg.global.avcodec.avcodec_get_name;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
Expand All @@ -26,6 +33,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
Expand Down Expand Up @@ -98,7 +106,6 @@
import io.antmedia.webrtc.api.IWebRTCAdaptor;
import io.antmedia.webrtc.api.IWebRTCClient;
import io.antmedia.websocket.WebSocketConstants;
import io.micrometer.common.util.StringUtils;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.dropwizard.MetricsService;
Expand Down Expand Up @@ -635,8 +642,20 @@ public void closeBroadcast(String streamId) {
Broadcast broadcast = getDataStore().get(streamId);
if (broadcast != null) {

getDataStore().updateStatus(streamId, BROADCAST_STATUS_FINISHED);
if (broadcast.isZombi()) {

logger.info("Deleting streamId:{} because it's a zombi stream", streamId);
getDataStore().delete(streamId);
}
else {

getDataStore().updateStatus(streamId, BROADCAST_STATUS_FINISHED);
// This is resets Viewer map in HLS Viewer Stats
resetHLSStats(streamId);

// This is resets Viewer map in DASH Viewer Stats
resetDASHStats(streamId);
}

final String listenerHookURL = getListenerHookURL(broadcast);
if (listenerHookURL != null && !listenerHookURL.isEmpty()) {
Expand All @@ -658,20 +677,7 @@ public void closeBroadcast(String streamId) {

if(StringUtils.isNotBlank(broadcast.getMainTrackStreamId())) {
updateMainTrackWithRecentlyFinishedBroadcast(broadcast);
}

if (broadcast.isZombi()) {

logger.info("Deleting streamId:{} because it's a zombi stream", streamId);
getDataStore().delete(streamId);
}
else {
// This is resets Viewer map in HLS Viewer Stats
resetHLSStats(streamId);

// This is resets Viewer map in DASH Viewer Stats
resetDASHStats(streamId);
}
}

for (IStreamListener listener : streamListeners) {
listener.streamFinished(broadcast.getStreamId());
Expand All @@ -683,6 +689,60 @@ public void closeBroadcast(String streamId) {
}
}

public static Broadcast saveMainBroadcast(String streamId, String mainTrackId, DataStore dataStore) {
Broadcast mainBroadcast = new Broadcast();
try {
mainBroadcast.setStreamId(mainTrackId);
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
mainBroadcast.setZombi(true);
mainBroadcast.setStatus(BROADCAST_STATUS_BROADCASTING);
mainBroadcast.getSubTrackStreamIds().add(streamId);
// don't set setOriginAdress because it's not a real stream and it causes extra delay -> mainBroadcast.setOriginAdress(serverSettings.getHostAddress())
mainBroadcast.setStartTime(System.currentTimeMillis());

return StringUtils.isNotBlank(dataStore.save(mainBroadcast)) ? mainBroadcast : null;
}

public static boolean isInstanceAlive(String originAdress, String hostAddress, int httpPort, String appName) {
if (StringUtils.isBlank(originAdress) || StringUtils.equals(originAdress, hostAddress)) {
return true;
}

String url = "http://" + originAdress + ":" + httpPort + "/" + appName;

boolean result = isEndpointReachable(url);
if (!result) {
logger.warn("Instance with origin address {} is not reachable through its app:{}", originAdress, appName);
}
return result;
}

public static boolean isEndpointReachable(String endpoint) {
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(endpoint))
.method("HEAD", HttpRequest.BodyPublishers.noBody()) // HEAD request
.timeout(java.time.Duration.ofSeconds(1))
.build();


try {
HttpResponse<Void> response = client.send(request, HttpResponse.BodyHandlers.discarding());
} catch (InterruptedException e) {
logger.error("InterruptedException Enpoint is not reachable: {}, {}", endpoint, ExceptionUtils.getStackTrace(e));
Thread.currentThread().interrupt();
return false;
} catch (Exception e) {
logger.error("Enpoint is not reachable: {}, {}", endpoint, ExceptionUtils.getStackTrace(e));
return false;
}
return true;


}

/**
* If multiple threads enter the method at the same time, the following method does not work correctly.
* So we have made it synchronized
Expand Down Expand Up @@ -1472,13 +1532,13 @@ public void closeStreamFetchers() {

public void waitUntilLiveStreamsStopped() {
int i = 0;
int waitPeriod = 1000;
int waitPeriod = 500;
boolean everythingHasStopped = true;
while(getDataStore().getLocalLiveBroadcastCount(getServerSettings().getHostAddress()) > 0) {
try {
if (i > 3) {
logger.warn("Waiting for active broadcasts number decrease to zero for app: {}"
+ "total wait time: {}ms", getScope().getName(), i*waitPeriod);
+ " total wait time: {}ms", getScope().getName(), i*waitPeriod);
}
if (i>10) {
logger.error("Not all live streams're stopped gracefully. It will update the streams' status to finished explicitly");
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/antmedia/datastore/db/DataStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,7 @@ public long getActiveBroadcastCount(Map<String, String> broadcastMap, Gson gson,
Broadcast broadcast = gson.fromJson(broadcastString, Broadcast.class);
String status = broadcast.getStatus();
if (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(status) &&
(StringUtils.isAnyBlank(hostAddress, broadcast.getOriginAdress()) || hostAddress.equals(broadcast.getOriginAdress())))
(StringUtils.isBlank(hostAddress) || hostAddress.equals(broadcast.getOriginAdress())))
{
activeBroadcastCount++;
}
Expand All @@ -688,7 +688,7 @@ public List<Broadcast> getActiveBroadcastList(Map<String, String> broadcastMap,

String status = broadcast.getStatus();
if (IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING.equals(status) &&
(StringUtils.isAnyBlank(hostAddress, broadcast.getOriginAdress()) || hostAddress.equals(broadcast.getOriginAdress())))
(StringUtils.isBlank(hostAddress) || hostAddress.equals(broadcast.getOriginAdress())))
{
broadcastList.add(broadcast);
}
Expand Down
8 changes: 2 additions & 6 deletions src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1368,9 +1368,7 @@ public long getLocalLiveBroadcastCount(String hostAddress) {
return datastore.find(Broadcast.class)
.filter(Filters.and(
Filters.or(
Filters.eq(ORIGIN_ADDRESS, hostAddress),
Filters.exists(ORIGIN_ADDRESS).not()
),
Filters.eq(ORIGIN_ADDRESS, hostAddress) ),
Filters.eq(STATUS, IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)
)).count();
}
Expand All @@ -1383,9 +1381,7 @@ public List<Broadcast> getLocalLiveBroadcasts(String hostAddress)
return datastore.find(Broadcast.class)
.filter(Filters.and(
Filters.or(
Filters.eq(ORIGIN_ADDRESS, hostAddress),
Filters.exists(ORIGIN_ADDRESS).not()
),
Filters.eq(ORIGIN_ADDRESS, hostAddress) ),
Filters.eq(STATUS, IAntMediaStreamHandler.BROADCAST_STATUS_BROADCASTING)
)).iterator().toList();
}
Expand Down
11 changes: 1 addition & 10 deletions src/main/java/io/antmedia/muxer/MuxAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -814,16 +814,7 @@ public void registerToMainTrackIfExists() {
Broadcast mainBroadcast = getDataStore().get(mainTrack);
if(mainBroadcast == null)
{
mainBroadcast = new Broadcast();
try {
mainBroadcast.setStreamId(mainTrack);
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
mainBroadcast.setZombi(true);
mainBroadcast.setStatus(BROADCAST_STATUS_BROADCASTING);
mainBroadcast.getSubTrackStreamIds().add(streamId);
getDataStore().save(mainBroadcast);
mainBroadcast = AntMediaApplicationAdapter.saveMainBroadcast(streamId, mainTrack, getDataStore());
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public boolean isPublishAllowed(IScope scope, String name, String mode, Map<Stri
else
{
result = true;
if (AntMediaApplicationAdapter.isStreaming(broadcast)) {
if (AntMediaApplicationAdapter.isStreaming(broadcast) &&
AntMediaApplicationAdapter.isInstanceAlive(broadcast.getOriginAdress(), getAppAdaptor(scope).getServerSettings().getHostAddress(), getAppAdaptor(scope).getServerSettings().getDefaultHttpPort(), scope.getName())
)
{
logger.info("Does not accept stream:{} because it's streaming", name);
result = false;
}
Expand Down Expand Up @@ -95,6 +98,10 @@ public boolean isPublishAllowed(IScope scope, String name, String mode, Map<Stri
public ILicenceService getLicenceService(IScope scope) {
return (ILicenceService)scope.getContext().getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString());
}

public AntMediaApplicationAdapter getAppAdaptor(IScope scope) {
return (AntMediaApplicationAdapter)scope.getContext().getApplicationContext().getBean(AntMediaApplicationAdapter.BEAN_NAME);
}

public DataStore getDatastore() {
if (dataStore == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.antmedia.muxer.MuxAdaptor;
import io.antmedia.rest.model.Result;
import io.antmedia.settings.ServerSettings;
import io.antmedia.shutdown.AMSShutdownManager;
import io.antmedia.streamsource.StreamFetcher.IStreamFetcherListener;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -75,12 +76,15 @@ public class StreamFetcherManager {

boolean serverShuttingDown = false;

private ServerSettings serverSettings;


public StreamFetcherManager(Vertx vertx, DataStore datastore,IScope scope) {
this.vertx = vertx;
this.datastore = datastore;
this.scope=scope;
this.appSettings = (AppSettings) scope.getContext().getBean(AppSettings.BEAN_NAME);
this.serverSettings = (ServerSettings) scope.getContext().getBean(ServerSettings.BEAN_NAME);
this.licenseService = (ILicenceService)scope.getContext().getBean(ILicenceService.BeanName.LICENCE_SERVICE.toString());
AMSShutdownManager.getInstance().subscribe(()-> shuttingDown());
}
Expand Down Expand Up @@ -119,7 +123,8 @@ public boolean isStreamRunning(Broadcast broadcast) {

if (!isStreamLive) {
//this stream may be fetching in somewhere in the cluster
isStreamLive = AntMediaApplicationAdapter.isStreaming(broadcast);
isStreamLive = AntMediaApplicationAdapter.isStreaming(broadcast) &&
AntMediaApplicationAdapter.isInstanceAlive(broadcast.getOriginAdress(), serverSettings.getHostAddress(), serverSettings.getDefaultHttpPort(), scope.getName());
}

return isStreamLive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import io.antmedia.rest.model.Result;
import io.antmedia.security.AcceptOnlyStreamsInDataStore;
import io.antmedia.settings.ServerSettings;
import io.antmedia.statistic.HlsViewerStats;
import io.antmedia.statistic.type.WebRTCAudioReceiveStats;
import io.antmedia.statistic.type.WebRTCAudioSendStats;
import io.antmedia.statistic.type.WebRTCVideoReceiveStats;
Expand Down Expand Up @@ -228,6 +229,30 @@ public void testFirebase() throws IOException, FirebaseMessagingException {
}

}

@Test
public void testEndpointReachable() {
boolean endpointReachable = AntMediaApplicationAdapter.isEndpointReachable("http://antmedia.io/not_exist");
//it should be true because we're just checking if it's reachable
assertTrue(endpointReachable);

endpointReachable = AntMediaApplicationAdapter.isEndpointReachable("http://antmedia.io:45454/not_exist");
assertFalse(endpointReachable);

boolean instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("antmedia.io", null, 80, "");
assertTrue(instanceAlive);

instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("antmedia.io", null, 4545, "");
assertFalse(instanceAlive);

instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("", null, 4545, "");
assertTrue(instanceAlive);

instanceAlive = AntMediaApplicationAdapter.isInstanceAlive("localhost", "localhost", 4545, "");
assertTrue(instanceAlive);

}

@Test
public void testIsIncomingTimeValid() {
AppSettings newSettings = new AppSettings();
Expand Down Expand Up @@ -815,8 +840,9 @@ public void testHookAfterDefined()

assertEquals(hookURL, spyAdaptor.getListenerHookURL(broadcast));


spyAdaptor = Mockito.spy(adapter);
Mockito.doNothing().when(spyAdaptor).resetHLSStats(Mockito.anyString());
Mockito.doNothing().when(spyAdaptor).resetDASHStats(Mockito.anyString());
appSettings = new AppSettings();
spyAdaptor.setServerSettings(new ServerSettings());
spyAdaptor.setAppSettings(appSettings);
Expand Down Expand Up @@ -2099,6 +2125,16 @@ public void testAddRemovePacketListener() {


}

@Test
public void testSaveMainBroadcast()
{
DataStore dataStore = new InMemoryDataStore("test");
Broadcast mainTrack = AntMediaApplicationAdapter.saveMainBroadcast("streamId", "mainTrackId", dataStore);
assertNotNull(mainTrack);
//origin address must be null because main track is not a real stream
assertNull(mainTrack.getOriginAdress());
}

@Test
public void testAppDeletion()
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/io/antmedia/test/StreamSchedularUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import io.antmedia.FFmpegUtilities;
import io.antmedia.datastore.db.DataStore;
import io.antmedia.datastore.db.IDataStoreFactory;
import io.antmedia.datastore.db.InMemoryDataStore;
import io.antmedia.datastore.db.MapDBStore;
import io.antmedia.datastore.db.types.Broadcast;
import io.antmedia.datastore.db.types.Broadcast.PlayListItem;
Expand Down Expand Up @@ -701,6 +702,36 @@ public void testSkipPlaylistItem() {
fail(e.getMessage());
}
}

@Test
public void testIsStreamRunning()
{
DataStore dataStore = new InMemoryDataStore("test");
StreamFetcherManager streamFetcherManager = Mockito.spy(new StreamFetcherManager(vertx, dataStore, appScope));

Broadcast broadcast = new Broadcast();

dataStore.save(broadcast);

boolean isStreamRunning = streamFetcherManager.isStreamRunning(broadcast);
assertFalse(isStreamRunning);

broadcast.setStatus(AntMediaApplicationAdapter.BROADCAST_STATUS_BROADCASTING);
broadcast.setUpdateTime(System.currentTimeMillis());

isStreamRunning = streamFetcherManager.isStreamRunning(broadcast);
assertTrue(isStreamRunning);

broadcast.setOriginAdress("not.accessible.antmedia.io");
isStreamRunning = streamFetcherManager.isStreamRunning(broadcast);
assertFalse(isStreamRunning);

broadcast.setUpdateTime(0);

isStreamRunning = streamFetcherManager.isStreamRunning(broadcast);
assertFalse(isStreamRunning);

}

@Test
public void testControlStreamFetchersPlayListAndRestart() {
Expand Down
Loading

0 comments on commit a334a2b

Please sign in to comment.