diff --git a/src/main/java/io/antmedia/datastore/db/MongoStore.java b/src/main/java/io/antmedia/datastore/db/MongoStore.java index deab66378..7ec864df1 100644 --- a/src/main/java/io/antmedia/datastore/db/MongoStore.java +++ b/src/main/java/io/antmedia/datastore/db/MongoStore.java @@ -66,6 +66,7 @@ import io.antmedia.muxer.IAntMediaStreamHandler; import io.antmedia.muxer.MuxAdaptor; import org.springframework.cache.Cache; +import org.springframework.cache.caffeine.CaffeineCache; import org.springframework.cache.caffeine.CaffeineCacheManager; public class MongoStore extends DataStore { @@ -84,8 +85,8 @@ public class MongoStore extends DataStore { private Datastore detectionMap; private Datastore conferenceRoomDatastore; private MongoClient mongoClient; - public CaffeineCacheManager cacheManager; - public Cache subscriberCache; + public CaffeineCacheManager cacheManager; + public CaffeineCache subscriberCache; protected static Logger logger = LoggerFactory.getLogger(MongoStore.class); @@ -1266,6 +1267,7 @@ public boolean addSubscriber(String streamId, Subscriber subscriber) { executedQueryCount++; subscriberDatastore.save(subscriber); + getSubscriberCache().put(getSubscriberCacheKey(streamId, subscriber.getSubscriberId()), subscriber); result = true; } catch (Exception e) { @@ -1410,6 +1412,20 @@ public boolean resetSubscribersConnectedStatus() { UpdateResult execute = subscriberDatastore.find(Subscriber.class).update(new UpdateOptions().multi(true), set("connected", false)); result = execute.getMatchedCount() > 1; + if(result){ + + getSubscriberCache().getNativeCache().asMap().forEach((key, value) -> { + if (value instanceof Subscriber) { + Subscriber subscriber = (Subscriber) value; + if(subscriber.getSubscriberId() != null){ + subscriber.setConnected(false); + getSubscriberCache().put(key, subscriber); + } + + } + }); + + } } catch (Exception e) { logger.error(ExceptionUtils.getStackTrace(e)); } @@ -1879,9 +1895,9 @@ public CaffeineCacheManager getCacheManager(){ return cacheManager; } - public Cache getSubscriberCache() { + public CaffeineCache getSubscriberCache() { if(subscriberCache == null){ - subscriberCache = cacheManager.getCache("subscriberCache"); + subscriberCache = (CaffeineCache) cacheManager.getCache("subscriberCache"); } return subscriberCache; diff --git a/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java b/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java index 8fb0ecb26..f30564353 100644 --- a/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java +++ b/src/test/java/io/antmedia/test/db/DBStoresUnitTest.java @@ -294,7 +294,7 @@ public void testMongoStore() throws Exception { dataStore = new MongoStore("127.0.0.1", "", "", "testdb"); - + /* testSaveDuplicateStreamId((MongoStore)dataStore); testLocalLiveBroadcast(dataStore); @@ -346,7 +346,9 @@ public void testMongoStore() throws Exception { testGetSubtracks(dataStore); - testGetSubtracksWithStatus(dataStore); + testGetSubtracksWithStatus(dataStore);*/ + testTimeBasedSubscriberOperations(dataStore); + testSubscriberCache(dataStore); dataStore.close(true); @@ -2307,7 +2309,10 @@ public void testTimeBasedSubscriberOperations(DataStore store) { store.addSubscriberConnectionEvent(subscriberPlay.getStreamId(), subscriberPlay.getSubscriberId(), connected); // isConnected should be true again assertTrue(store.isSubscriberConnected(subscriberPlay.getStreamId(), subscriberPlay.getSubscriberId())); - + + subscribers = store.listAllSubscribers(streamId, 0, 10); + + // reset connection status assertTrue(store.resetSubscribersConnectedStatus()); // connection status should false again