Skip to content

Commit

Permalink
test trials
Browse files Browse the repository at this point in the history
  • Loading branch information
lastpeony committed Nov 27, 2024
1 parent b61e2d2 commit c1b0eb5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 7 deletions.
24 changes: 20 additions & 4 deletions src/main/java/io/antmedia/datastore/db/MongoStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import io.antmedia.muxer.IAntMediaStreamHandler;
import io.antmedia.muxer.MuxAdaptor;
import org.springframework.cache.Cache;
import org.springframework.cache.caffeine.CaffeineCache;
import org.springframework.cache.caffeine.CaffeineCacheManager;

public class MongoStore extends DataStore {
Expand All @@ -84,8 +85,8 @@ public class MongoStore extends DataStore {
private Datastore detectionMap;
private Datastore conferenceRoomDatastore;
private MongoClient mongoClient;
public CaffeineCacheManager cacheManager;
public Cache subscriberCache;
public CaffeineCacheManager cacheManager;
public CaffeineCache subscriberCache;


protected static Logger logger = LoggerFactory.getLogger(MongoStore.class);
Expand Down Expand Up @@ -1266,6 +1267,7 @@ public boolean addSubscriber(String streamId, Subscriber subscriber) {
executedQueryCount++;

subscriberDatastore.save(subscriber);

getSubscriberCache().put(getSubscriberCacheKey(streamId, subscriber.getSubscriberId()), subscriber);
result = true;
} catch (Exception e) {
Expand Down Expand Up @@ -1410,6 +1412,20 @@ public boolean resetSubscribersConnectedStatus() {

UpdateResult execute = subscriberDatastore.find(Subscriber.class).update(new UpdateOptions().multi(true), set("connected", false));
result = execute.getMatchedCount() > 1;
if(result){

getSubscriberCache().getNativeCache().asMap().forEach((key, value) -> {
if (value instanceof Subscriber) {
Subscriber subscriber = (Subscriber) value;
if(subscriber.getSubscriberId() != null){
subscriber.setConnected(false);
getSubscriberCache().put(key, subscriber);
}

}
});

}
} catch (Exception e) {
logger.error(ExceptionUtils.getStackTrace(e));
}
Expand Down Expand Up @@ -1879,9 +1895,9 @@ public CaffeineCacheManager getCacheManager(){
return cacheManager;
}

public Cache getSubscriberCache() {
public CaffeineCache getSubscriberCache() {
if(subscriberCache == null){
subscriberCache = cacheManager.getCache("subscriberCache");
subscriberCache = (CaffeineCache) cacheManager.getCache("subscriberCache");
}

return subscriberCache;
Expand Down
11 changes: 8 additions & 3 deletions src/test/java/io/antmedia/test/db/DBStoresUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testMongoStore() throws Exception {

dataStore = new MongoStore("127.0.0.1", "", "", "testdb");

/*
testSaveDuplicateStreamId((MongoStore)dataStore);
testLocalLiveBroadcast(dataStore);
Expand Down Expand Up @@ -346,7 +346,9 @@ public void testMongoStore() throws Exception {
testGetSubtracks(dataStore);
testGetSubtracksWithStatus(dataStore);
testGetSubtracksWithStatus(dataStore);*/
testTimeBasedSubscriberOperations(dataStore);

testSubscriberCache(dataStore);

dataStore.close(true);
Expand Down Expand Up @@ -2307,7 +2309,10 @@ public void testTimeBasedSubscriberOperations(DataStore store) {
store.addSubscriberConnectionEvent(subscriberPlay.getStreamId(), subscriberPlay.getSubscriberId(), connected);
// isConnected should be true again
assertTrue(store.isSubscriberConnected(subscriberPlay.getStreamId(), subscriberPlay.getSubscriberId()));


subscribers = store.listAllSubscribers(streamId, 0, 10);


// reset connection status
assertTrue(store.resetSubscribersConnectedStatus());
// connection status should false again
Expand Down

0 comments on commit c1b0eb5

Please sign in to comment.