From ce88e71e738ac7bb2cd5d63e4e314e2de82f72ef Mon Sep 17 00:00:00 2001 From: Mila <107142260+milaGGL@users.noreply.github.com> Date: Mon, 11 Mar 2024 15:08:28 -0400 Subject: [PATCH] snapshot listeners source from cache (#7982) --- .changeset/smart-games-cheer.md | 5 + common/api-review/firestore.api.md | 4 + docs-devsite/firestore_.md | 13 + .../firestore_.snapshotlistenoptions.md | 11 + packages/firestore/src/api.ts | 6 +- packages/firestore/src/api/reference_impl.ts | 21 +- packages/firestore/src/core/event_manager.ts | 168 +++- .../firestore/src/core/firestore_client.ts | 12 +- .../firestore/src/core/sync_engine_impl.ts | 104 ++- .../src/local/shared_client_state.ts | 2 +- .../test/integration/api/query.test.ts | 2 +- .../api/snasphot_listener_source.test.ts | 786 ++++++++++++++++ .../test/unit/core/event_manager.test.ts | 38 +- .../test/unit/specs/bundle_spec.test.ts | 2 +- .../unit/specs/listen_source_spec.test.ts | 843 ++++++++++++++++++ .../firestore/test/unit/specs/spec_builder.ts | 86 +- .../test/unit/specs/spec_test_runner.ts | 33 +- 17 files changed, 2055 insertions(+), 81 deletions(-) create mode 100644 .changeset/smart-games-cheer.md create mode 100644 packages/firestore/test/integration/api/snasphot_listener_source.test.ts create mode 100644 packages/firestore/test/unit/specs/listen_source_spec.test.ts diff --git a/.changeset/smart-games-cheer.md b/.changeset/smart-games-cheer.md new file mode 100644 index 00000000000..e073d62c03c --- /dev/null +++ b/.changeset/smart-games-cheer.md @@ -0,0 +1,5 @@ +--- +'@firebase/firestore': minor +'firebase': minor +--- +Enable snapshot listener option to retrieve data from local cache only. diff --git a/common/api-review/firestore.api.md b/common/api-review/firestore.api.md index ee4fcc842ff..f79ef52442e 100644 --- a/common/api-review/firestore.api.md +++ b/common/api-review/firestore.api.md @@ -350,6 +350,9 @@ export function limit(limit: number): QueryLimitConstraint; // @public export function limitToLast(limit: number): QueryLimitConstraint; +// @public +export type ListenSource = 'default' | 'cache'; + // @public export function loadBundle(firestore: Firestore, bundleData: ReadableStream | ArrayBuffer | string): LoadBundleTask; @@ -651,6 +654,7 @@ export function snapshotEqual(le // @public export interface SnapshotListenOptions { readonly includeMetadataChanges?: boolean; + readonly source?: ListenSource; } // @public diff --git a/docs-devsite/firestore_.md b/docs-devsite/firestore_.md index 07bb175ee86..74a0c356523 100644 --- a/docs-devsite/firestore_.md +++ b/docs-devsite/firestore_.md @@ -204,6 +204,7 @@ https://github.com/firebase/firebase-js-sdk | [DocumentChangeType](./firestore_.md#documentchangetype) | The type of a DocumentChange may be 'added', 'removed', or 'modified'. | | [FirestoreErrorCode](./firestore_.md#firestoreerrorcode) | The set of Firestore status codes. The codes are the same at the ones exposed by gRPC here: https://github.com/grpc/grpc/blob/master/doc/statuscodes.mdPossible values: - 'cancelled': The operation was cancelled (typically by the caller). - 'unknown': Unknown error or an error from a different error domain. - 'invalid-argument': Client specified an invalid argument. Note that this differs from 'failed-precondition'. 'invalid-argument' indicates arguments that are problematic regardless of the state of the system (e.g. an invalid field name). - 'deadline-exceeded': Deadline expired before operation could complete. For operations that change the state of the system, this error may be returned even if the operation has completed successfully. For example, a successful response from a server could have been delayed long enough for the deadline to expire. - 'not-found': Some requested document was not found. - 'already-exists': Some document that we attempted to create already exists. - 'permission-denied': The caller does not have permission to execute the specified operation. - 'resource-exhausted': Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file system is out of space. - 'failed-precondition': Operation was rejected because the system is not in a state required for the operation's execution. - 'aborted': The operation was aborted, typically due to a concurrency issue like transaction aborts, etc. - 'out-of-range': Operation was attempted past the valid range. - 'unimplemented': Operation is not implemented or not supported/enabled. - 'internal': Internal errors. Means some invariants expected by underlying system has been broken. If you see one of these errors, something is very broken. - 'unavailable': The service is currently unavailable. This is most likely a transient condition and may be corrected by retrying with a backoff. - 'data-loss': Unrecoverable data loss or corruption. - 'unauthenticated': The request does not have valid authentication credentials for the operation. | | [FirestoreLocalCache](./firestore_.md#firestorelocalcache) | Union type from all supported SDK cache layer. | +| [ListenSource](./firestore_.md#listensource) | Describe the source a query listens to.Set to default to listen to both cache and server changes. Set to cache to listen to changes in cache only. | | [MemoryGarbageCollector](./firestore_.md#memorygarbagecollector) | Union type from all support gabage collectors for memory local cache. | | [NestedUpdateFields](./firestore_.md#nestedupdatefields) | For each field (e.g. 'bar'), find all nested keys (e.g. {'bar.baz': T1, 'bar.qux': T2}). Intersect them together to make a single map containing all possible keys that are all marked as optional | | [OrderByDirection](./firestore_.md#orderbydirection) | The direction of a [orderBy()](./firestore_.md#orderby_006d61f) clause is specified as 'desc' or 'asc' (descending or ascending). | @@ -2551,6 +2552,18 @@ Union type from all supported SDK cache layer. export declare type FirestoreLocalCache = MemoryLocalCache | PersistentLocalCache; ``` +## ListenSource + +Describe the source a query listens to. + +Set to `default` to listen to both cache and server changes. Set to `cache` to listen to changes in cache only. + +Signature: + +```typescript +export declare type ListenSource = 'default' | 'cache'; +``` + ## MemoryGarbageCollector Union type from all support gabage collectors for memory local cache. diff --git a/docs-devsite/firestore_.snapshotlistenoptions.md b/docs-devsite/firestore_.snapshotlistenoptions.md index 7551b0a2f23..33d769f4fd9 100644 --- a/docs-devsite/firestore_.snapshotlistenoptions.md +++ b/docs-devsite/firestore_.snapshotlistenoptions.md @@ -23,6 +23,7 @@ export declare interface SnapshotListenOptions | Property | Type | Description | | --- | --- | --- | | [includeMetadataChanges](./firestore_.snapshotlistenoptions.md#snapshotlistenoptionsincludemetadatachanges) | boolean | Include a change even if only the metadata of the query or of a document changed. Default is false. | +| [source](./firestore_.snapshotlistenoptions.md#snapshotlistenoptionssource) | [ListenSource](./firestore_.md#listensource) | Set the source the query listens to. Default to "default", which listens to both cache and server. | ## SnapshotListenOptions.includeMetadataChanges @@ -33,3 +34,13 @@ Include a change even if only the metadata of the query or of a document changed ```typescript readonly includeMetadataChanges?: boolean; ``` + +## SnapshotListenOptions.source + +Set the source the query listens to. Default to "default", which listens to both cache and server. + +Signature: + +```typescript +readonly source?: ListenSource; +``` diff --git a/packages/firestore/src/api.ts b/packages/firestore/src/api.ts index 510d95c8a89..bcfa6dc5f34 100644 --- a/packages/firestore/src/api.ts +++ b/packages/firestore/src/api.ts @@ -139,7 +139,11 @@ export { WhereFilterOp } from './api/filter'; -export { SnapshotListenOptions, Unsubscribe } from './api/reference_impl'; +export { + ListenSource, + SnapshotListenOptions, + Unsubscribe +} from './api/reference_impl'; export { TransactionOptions } from './api/transaction_options'; diff --git a/packages/firestore/src/api/reference_impl.ts b/packages/firestore/src/api/reference_impl.ts index 13cfb09f518..e730fb40da7 100644 --- a/packages/firestore/src/api/reference_impl.ts +++ b/packages/firestore/src/api/reference_impl.ts @@ -24,6 +24,7 @@ import { NextFn, PartialObserver } from '../api/observer'; +import { ListenerDataSource } from '../core/event_manager'; import { firestoreClientAddSnapshotsInSyncListener, firestoreClientGetDocumentFromLocalCache, @@ -78,8 +79,22 @@ export interface SnapshotListenOptions { * changed. Default is false. */ readonly includeMetadataChanges?: boolean; + + /** + * Set the source the query listens to. Default to "default", which + * listens to both cache and server. + */ + readonly source?: ListenSource; } +/** + * Describe the source a query listens to. + * + * Set to `default` to listen to both cache and server changes. Set to `cache` + * to listen to changes in cache only. + */ +export type ListenSource = 'default' | 'cache'; + /** * Reads the document referred to by this `DocumentReference`. * @@ -668,7 +683,8 @@ export function onSnapshot( reference = getModularInstance(reference); let options: SnapshotListenOptions = { - includeMetadataChanges: false + includeMetadataChanges: false, + source: 'default' }; let currArg = 0; if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) { @@ -677,7 +693,8 @@ export function onSnapshot( } const internalOptions = { - includeMetadataChanges: options.includeMetadataChanges + includeMetadataChanges: options.includeMetadataChanges, + source: options.source as ListenerDataSource }; if (isPartialObserver(args[currArg])) { diff --git a/packages/firestore/src/core/event_manager.ts b/packages/firestore/src/core/event_manager.ts index 1723abd15d9..b53c45669cb 100644 --- a/packages/firestore/src/core/event_manager.ts +++ b/packages/firestore/src/core/event_manager.ts @@ -32,6 +32,11 @@ import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot'; class QueryListenersInfo { viewSnap: ViewSnapshot | undefined = undefined; listeners: QueryListener[] = []; + + // Helper methods that checks if the query has listeners that listening to remote store + hasRemoteListeners(): boolean { + return this.listeners.some(listener => listener.listensToRemoteStore()); + } } /** @@ -52,8 +57,13 @@ export interface Observer { * allows users to tree-shake the Watch logic. */ export interface EventManager { - onListen?: (query: Query) => Promise; - onUnlisten?: (query: Query) => Promise; + onListen?: ( + query: Query, + enableRemoteListen: boolean + ) => Promise; + onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise; + onFirstRemoteStoreListen?: (query: Query) => Promise; + onLastRemoteStoreUnlisten?: (query: Query) => Promise; } export function newEventManager(): EventManager { @@ -71,9 +81,50 @@ export class EventManagerImpl implements EventManager { snapshotsInSyncListeners: Set> = new Set(); /** Callback invoked when a Query is first listen to. */ - onListen?: (query: Query) => Promise; + onListen?: ( + query: Query, + enableRemoteListen: boolean + ) => Promise; /** Callback invoked once all listeners to a Query are removed. */ - onUnlisten?: (query: Query) => Promise; + onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise; + + /** + * Callback invoked when a Query starts listening to the remote store, while + * already listening to the cache. + */ + onFirstRemoteStoreListen?: (query: Query) => Promise; + /** + * Callback invoked when a Query stops listening to the remote store, while + * still listening to the cache. + */ + onLastRemoteStoreUnlisten?: (query: Query) => Promise; +} + +function validateEventManager(eventManagerImpl: EventManagerImpl): void { + debugAssert(!!eventManagerImpl.onListen, 'onListen not set'); + debugAssert( + !!eventManagerImpl.onFirstRemoteStoreListen, + 'onFirstRemoteStoreListen not set' + ); + debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set'); + debugAssert( + !!eventManagerImpl.onLastRemoteStoreUnlisten, + 'onLastRemoteStoreUnlisten not set' + ); +} + +const enum ListenerSetupAction { + InitializeLocalListenAndRequireWatchConnection, + InitializeLocalListenOnly, + RequireWatchConnectionOnly, + NoActionRequired +} + +const enum ListenerRemovalAction { + TerminateLocalListenAndRequireWatchDisconnection, + TerminateLocalListenOnly, + RequireWatchDisconnectionOnly, + NoActionRequired } export async function eventManagerListen( @@ -81,28 +132,53 @@ export async function eventManagerListen( listener: QueryListener ): Promise { const eventManagerImpl = debugCast(eventManager, EventManagerImpl); + validateEventManager(eventManagerImpl); + + let listenerAction = ListenerSetupAction.NoActionRequired; - debugAssert(!!eventManagerImpl.onListen, 'onListen not set'); const query = listener.query; - let firstListen = false; let queryInfo = eventManagerImpl.queries.get(query); if (!queryInfo) { - firstListen = true; queryInfo = new QueryListenersInfo(); + listenerAction = listener.listensToRemoteStore() + ? ListenerSetupAction.InitializeLocalListenAndRequireWatchConnection + : ListenerSetupAction.InitializeLocalListenOnly; + } else if ( + !queryInfo.hasRemoteListeners() && + listener.listensToRemoteStore() + ) { + // Query has been listening to local cache, and tries to add a new listener sourced from watch. + listenerAction = ListenerSetupAction.RequireWatchConnectionOnly; } - if (firstListen) { - try { - queryInfo.viewSnap = await eventManagerImpl.onListen(query); - } catch (e) { - const firestoreError = wrapInUserErrorIfRecoverable( - e as Error, - `Initialization of query '${stringifyQuery(listener.query)}' failed` - ); - listener.onError(firestoreError); - return; + try { + switch (listenerAction) { + case ListenerSetupAction.InitializeLocalListenAndRequireWatchConnection: + queryInfo.viewSnap = await eventManagerImpl.onListen!( + query, + /** enableRemoteListen= */ true + ); + break; + case ListenerSetupAction.InitializeLocalListenOnly: + queryInfo.viewSnap = await eventManagerImpl.onListen!( + query, + /** enableRemoteListen= */ false + ); + break; + case ListenerSetupAction.RequireWatchConnectionOnly: + await eventManagerImpl.onFirstRemoteStoreListen!(query); + break; + default: + break; } + } catch (e) { + const firestoreError = wrapInUserErrorIfRecoverable( + e as Error, + `Initialization of query '${stringifyQuery(listener.query)}' failed` + ); + listener.onError(firestoreError); + return; } eventManagerImpl.queries.set(query, queryInfo); @@ -130,23 +206,47 @@ export async function eventManagerUnlisten( listener: QueryListener ): Promise { const eventManagerImpl = debugCast(eventManager, EventManagerImpl); + validateEventManager(eventManagerImpl); - debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set'); const query = listener.query; - let lastListen = false; + let listenerAction = ListenerRemovalAction.NoActionRequired; const queryInfo = eventManagerImpl.queries.get(query); if (queryInfo) { const i = queryInfo.listeners.indexOf(listener); if (i >= 0) { queryInfo.listeners.splice(i, 1); - lastListen = queryInfo.listeners.length === 0; + + if (queryInfo.listeners.length === 0) { + listenerAction = listener.listensToRemoteStore() + ? ListenerRemovalAction.TerminateLocalListenAndRequireWatchDisconnection + : ListenerRemovalAction.TerminateLocalListenOnly; + } else if ( + !queryInfo.hasRemoteListeners() && + listener.listensToRemoteStore() + ) { + // The removed listener is the last one that sourced from watch. + listenerAction = ListenerRemovalAction.RequireWatchDisconnectionOnly; + } } } - - if (lastListen) { - eventManagerImpl.queries.delete(query); - return eventManagerImpl.onUnlisten(query); + switch (listenerAction) { + case ListenerRemovalAction.TerminateLocalListenAndRequireWatchDisconnection: + eventManagerImpl.queries.delete(query); + return eventManagerImpl.onUnlisten!( + query, + /** disableRemoteListen= */ true + ); + case ListenerRemovalAction.TerminateLocalListenOnly: + eventManagerImpl.queries.delete(query); + return eventManagerImpl.onUnlisten!( + query, + /** disableRemoteListen= */ false + ); + case ListenerRemovalAction.RequireWatchDisconnectionOnly: + return eventManagerImpl.onLastRemoteStoreUnlisten!(query); + default: + return; } } @@ -241,6 +341,14 @@ function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void { }); } +export enum ListenerDataSource { + /** Listen to both cache and server changes */ + Default = 'default', + + /** Listen to changes in cache only */ + Cache = 'cache' +} + export interface ListenOptions { /** Raise events even when only the metadata changes */ readonly includeMetadataChanges?: boolean; @@ -250,6 +358,9 @@ export interface ListenOptions { * offline. */ readonly waitForSyncWhenOnline?: boolean; + + /** Set the source events raised from. */ + readonly source?: ListenerDataSource; } /** @@ -359,6 +470,11 @@ export class QueryListener { return true; } + // Always raise event if listening to cache + if (!this.listensToRemoteStore()) { + return true; + } + // NOTE: We consider OnlineState.Unknown as online (it should become Offline // or Online if we wait long enough). const maybeOnline = onlineState !== OnlineState.Offline; @@ -417,4 +533,8 @@ export class QueryListener { this.raisedInitialEvent = true; this.queryObserver.next(snap); } + + listensToRemoteStore(): boolean { + return this.options.source !== ListenerDataSource.Cache; + } } diff --git a/packages/firestore/src/core/firestore_client.ts b/packages/firestore/src/core/firestore_client.ts index d1b66d86e2f..6e21737380b 100644 --- a/packages/firestore/src/core/firestore_client.ts +++ b/packages/firestore/src/core/firestore_client.ts @@ -86,7 +86,9 @@ import { syncEngineLoadBundle, syncEngineRegisterPendingWritesCallback, syncEngineUnlisten, - syncEngineWrite + syncEngineWrite, + triggerRemoteStoreListen, + triggerRemoteStoreUnlisten } from './sync_engine_impl'; import { Transaction } from './transaction'; import { TransactionOptions } from './transaction_options'; @@ -397,6 +399,14 @@ export async function getEventManager( null, onlineComponentProvider.syncEngine ); + eventManager.onFirstRemoteStoreListen = triggerRemoteStoreListen.bind( + null, + onlineComponentProvider.syncEngine + ); + eventManager.onLastRemoteStoreUnlisten = triggerRemoteStoreUnlisten.bind( + null, + onlineComponentProvider.syncEngine + ); return eventManager; } diff --git a/packages/firestore/src/core/sync_engine_impl.ts b/packages/firestore/src/core/sync_engine_impl.ts index 18a1dc681f5..f4db6f4a5bd 100644 --- a/packages/firestore/src/core/sync_engine_impl.ts +++ b/packages/firestore/src/core/sync_engine_impl.ts @@ -292,11 +292,11 @@ export function newSyncEngine( */ export async function syncEngineListen( syncEngine: SyncEngine, - query: Query + query: Query, + shouldListenToRemote: boolean = true ): Promise { const syncEngineImpl = ensureWatchCallbacks(syncEngine); - let targetId; let viewSnapshot; const queryView = syncEngineImpl.queryViewsByQuery.get(query); @@ -307,19 +307,58 @@ export async function syncEngineListen( // behalf of another tab and the user of the primary also starts listening // to the query. EventManager will not have an assigned target ID in this // case and calls `listen` to obtain this ID. - targetId = queryView.targetId; - syncEngineImpl.sharedClientState.addLocalQueryTarget(targetId); + syncEngineImpl.sharedClientState.addLocalQueryTarget(queryView.targetId); viewSnapshot = queryView.view.computeInitialSnapshot(); } else { - const targetData = await localStoreAllocateTarget( - syncEngineImpl.localStore, - queryToTarget(query) + viewSnapshot = await allocateTargetAndMaybeListen( + syncEngineImpl, + query, + shouldListenToRemote, + /** shouldInitializeView= */ true ); + debugAssert(!!viewSnapshot, 'viewSnapshot is not initialized'); + } - const status = syncEngineImpl.sharedClientState.addLocalQueryTarget( - targetData.targetId - ); - targetId = targetData.targetId; + return viewSnapshot; +} + +/** Query has been listening to the cache, and tries to initiate the remote store listen */ +export async function triggerRemoteStoreListen( + syncEngine: SyncEngine, + query: Query +): Promise { + const syncEngineImpl = ensureWatchCallbacks(syncEngine); + await allocateTargetAndMaybeListen( + syncEngineImpl, + query, + /** shouldListenToRemote= */ true, + /** shouldInitializeView= */ false + ); +} + +async function allocateTargetAndMaybeListen( + syncEngineImpl: SyncEngineImpl, + query: Query, + shouldListenToRemote: boolean, + shouldInitializeView: boolean +): Promise { + const targetData = await localStoreAllocateTarget( + syncEngineImpl.localStore, + queryToTarget(query) + ); + + const targetId = targetData.targetId; + + // PORTING NOTE: When the query is listening to cache only, we skip sending it over to Watch by + // not registering it in shared client state, and directly calculate initial snapshots and + // subsequent updates from cache. Otherwise, register the target ID with local Firestore client + // as active watch target. + const status: QueryTargetState = shouldListenToRemote + ? syncEngineImpl.sharedClientState.addLocalQueryTarget(targetId) + : 'not-current'; + + let viewSnapshot; + if (shouldInitializeView) { viewSnapshot = await initializeViewAndComputeSnapshot( syncEngineImpl, query, @@ -327,10 +366,10 @@ export async function syncEngineListen( status === 'current', targetData.resumeToken ); + } - if (syncEngineImpl.isPrimaryClient) { - remoteStoreListen(syncEngineImpl.remoteStore, targetData); - } + if (syncEngineImpl.isPrimaryClient && shouldListenToRemote) { + remoteStoreListen(syncEngineImpl.remoteStore, targetData); } return viewSnapshot; @@ -393,7 +432,8 @@ async function initializeViewAndComputeSnapshot( /** Stops listening to the query. */ export async function syncEngineUnlisten( syncEngine: SyncEngine, - query: Query + query: Query, + shouldUnlistenToRemote: boolean ): Promise { const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl); const queryView = syncEngineImpl.queryViewsByQuery.get(query)!; @@ -430,7 +470,9 @@ export async function syncEngineUnlisten( ) .then(() => { syncEngineImpl.sharedClientState.clearQueryState(queryView.targetId); - remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId); + if (shouldUnlistenToRemote) { + remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId); + } removeAndCleanupTarget(syncEngineImpl, queryView.targetId); }) .catch(ignoreIfPrimaryLeaseLoss); @@ -445,6 +487,28 @@ export async function syncEngineUnlisten( } } +/** Unlistens to the remote store while still listening to the cache. */ +export async function triggerRemoteStoreUnlisten( + syncEngine: SyncEngine, + query: Query +): Promise { + const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl); + const queryView = syncEngineImpl.queryViewsByQuery.get(query)!; + debugAssert( + !!queryView, + 'Trying to unlisten on query not found:' + stringifyQuery(query) + ); + const queries = syncEngineImpl.queriesByTarget.get(queryView.targetId)!; + + if (syncEngineImpl.isPrimaryClient && queries.length === 1) { + // PORTING NOTE: Unregister the target ID with local Firestore client as + // watch target. + syncEngineImpl.sharedClientState.removeLocalQueryTarget(queryView.targetId); + + remoteStoreUnlisten(syncEngineImpl.remoteStore, queryView.targetId); + } +} + /** * Initiates the write of local mutation batch which involves adding the * writes to the mutation queue, notifying the remote store about new @@ -1504,8 +1568,12 @@ export async function syncEngineApplyActiveTargetsChange( } for (const targetId of added) { - if (syncEngineImpl.queriesByTarget.has(targetId)) { - // A target might have been added in a previous attempt + // A target is already listening to remote store if it is already registered to + // sharedClientState. + const targetAlreadyListeningToRemoteStore = + syncEngineImpl.queriesByTarget.has(targetId) && + syncEngineImpl.sharedClientState.isActiveQueryTarget(targetId); + if (targetAlreadyListeningToRemoteStore) { logDebug(LOG_TAG, 'Adding an already active target ' + targetId); continue; } diff --git a/packages/firestore/src/local/shared_client_state.ts b/packages/firestore/src/local/shared_client_state.ts index 67a528957b4..0d0e430c12f 100644 --- a/packages/firestore/src/local/shared_client_state.ts +++ b/packages/firestore/src/local/shared_client_state.ts @@ -349,7 +349,7 @@ export class QueryTargetMetadata { /** * Metadata state of a single client denoting the query targets it is actively - * listening to. + * listening to the watch. */ // Visible for testing. export interface ClientState { diff --git a/packages/firestore/test/integration/api/query.test.ts b/packages/firestore/test/integration/api/query.test.ts index 7f1d10dd7c1..7468b39bfeb 100644 --- a/packages/firestore/test/integration/api/query.test.ts +++ b/packages/firestore/test/integration/api/query.test.ts @@ -2937,7 +2937,7 @@ apiDescribe('Hanging query issue - #7652', persistence => { } }); -function verifyDocumentChange( +export function verifyDocumentChange( change: DocumentChange, id: string, oldIndex: number, diff --git a/packages/firestore/test/integration/api/snasphot_listener_source.test.ts b/packages/firestore/test/integration/api/snasphot_listener_source.test.ts new file mode 100644 index 00000000000..39a93d61912 --- /dev/null +++ b/packages/firestore/test/integration/api/snasphot_listener_source.test.ts @@ -0,0 +1,786 @@ +/** + * @license + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { expect } from 'chai'; + +import { EventsAccumulator } from '../util/events_accumulator'; +import { + addDoc, + disableNetwork, + doc, + DocumentSnapshot, + enableNetwork, + getDoc, + getDocs, + limit, + limitToLast, + onSnapshot, + orderBy, + query, + QuerySnapshot, + runTransaction, + updateDoc, + where +} from '../util/firebase_export'; +import { + apiDescribe, + toDataArray, + withTestCollection, + withTestDocAndInitialData +} from '../util/helpers'; + +apiDescribe('Snapshot Listener source options ', persistence => { + // eslint-disable-next-line no-restricted-properties + (persistence.gc === 'lru' ? describe : describe.skip)( + 'listen to persistence cache', + () => { + it('can raise snapshot from cache for Query', () => { + const testDocs = { + a: { k: 'a' } + }; + return withTestCollection(persistence, testDocs, async coll => { + await getDocs(coll); // Populate the cache. + + const storeEvent = new EventsAccumulator(); + const unsubscribe = onSnapshot( + coll, + { source: 'cache' }, + storeEvent.storeEvent + ); + + const snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.equal(true); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'a' }]); + + await storeEvent.assertNoAdditionalEvents(); + unsubscribe(); + }); + }); + + it('can raise snapshot from cache for DocumentReference', () => { + const testDocs = { k: 'a' }; + return withTestDocAndInitialData( + persistence, + testDocs, + async docRef => { + await getDoc(docRef); // Populate the cache. + + const storeEvent = new EventsAccumulator(); + const unsubscribe = onSnapshot( + docRef, + { source: 'cache' }, + storeEvent.storeEvent + ); + + const snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.equal(true); + expect(snapshot.data()).to.deep.equal({ k: 'a' }); + + await storeEvent.assertNoAdditionalEvents(); + unsubscribe(); + } + ); + }); + + it('listen to cache would not be affected by online status change', () => { + const testDocs = { + a: { k: 'a' } + }; + return withTestCollection(persistence, testDocs, async (coll, db) => { + await getDocs(coll); // Populate the cache. + + const storeEvent = new EventsAccumulator(); + const unsubscribe = onSnapshot( + coll, + { includeMetadataChanges: true, source: 'cache' }, + storeEvent.storeEvent + ); + + const snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.equal(true); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'a' }]); + + await disableNetwork(db); + await enableNetwork(db); + + await storeEvent.assertNoAdditionalEvents(); + unsubscribe(); + }); + }); + + it('multiple listeners sourced from cache can work independently', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + await getDocs(coll); // Populate the cache. + const testQuery = query( + coll, + where('sort', '>', 0), + orderBy('sort', 'asc') + ); + + const storeEvent = new EventsAccumulator(); + const unsubscribe1 = onSnapshot( + testQuery, + { source: 'cache' }, + storeEvent.storeEvent + ); + + const unsubscribe2 = onSnapshot( + testQuery, + { source: 'cache' }, + storeEvent.storeEvent + ); + + let snapshots = await storeEvent.awaitEvents(2); + expect(toDataArray(snapshots[0])).to.deep.equal([ + { k: 'b', sort: 1 } + ]); + expect(snapshots[0].metadata).to.deep.equal(snapshots[1].metadata); + expect(toDataArray(snapshots[0])).to.deep.equal( + toDataArray(snapshots[1]) + ); + + await addDoc(coll, { k: 'c', sort: 2 }); + + snapshots = await storeEvent.awaitEvents(2); + expect(toDataArray(snapshots[0])).to.deep.equal([ + { k: 'b', sort: 1 }, + { k: 'c', sort: 2 } + ]); + expect(snapshots[0].metadata).to.deep.equal(snapshots[1].metadata); + expect(toDataArray(snapshots[0])).to.deep.equal( + toDataArray(snapshots[1]) + ); + + // Detach one listener, and do a local mutation. The other listener + // should not be affected. + unsubscribe1(); + + await addDoc(coll, { k: 'd', sort: 3 }); + + const snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.equal(true); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'b', sort: 1 }, + { k: 'c', sort: 2 }, + { k: 'd', sort: 3 } + ]); + await storeEvent.assertNoAdditionalEvents(); + unsubscribe2(); + }); + }); + + // Two queries that mapped to the same target ID are referred to as + // "mirror queries". An example for a mirror query is a limitToLast() + // query and a limit() query that share the same backend Target ID. + // Since limitToLast() queries are sent to the backend with a modified + // orderBy() clause, they can map to the same target representation as + // limit() query, even if both queries appear separate to the user. + it('can listen/un-listen/re-listen to mirror queries from cache', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 }, + c: { k: 'c', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + await getDocs(coll); // Populate the cache. + + // Setup `limit` query + const storeLimitEvent = new EventsAccumulator(); + let limitUnlisten = onSnapshot( + query(coll, orderBy('sort', 'asc'), limit(2)), + { source: 'cache' }, + storeLimitEvent.storeEvent + ); + + // Setup mirroring `limitToLast` query + const storeLimitToLastEvent = new EventsAccumulator(); + let limitToLastUnlisten = onSnapshot( + query(coll, orderBy('sort', 'desc'), limitToLast(2)), + { source: 'cache' }, + storeLimitToLastEvent.storeEvent + ); + + // Verify both queries get expected results. + let snapshot = await storeLimitEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'a', sort: 0 }, + { k: 'b', sort: 1 } + ]); + snapshot = await storeLimitToLastEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'b', sort: 1 }, + { k: 'a', sort: 0 } + ]); + + // Un-listen then re-listen to the limit query. + limitUnlisten(); + limitUnlisten = onSnapshot( + query(coll, orderBy('sort', 'asc'), limit(2)), + { source: 'cache' }, + storeLimitEvent.storeEvent + ); + snapshot = await storeLimitEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'a', sort: 0 }, + { k: 'b', sort: 1 } + ]); + expect(snapshot.metadata.fromCache).to.equal(true); + + // Add a document that would change the result set. + await addDoc(coll, { k: 'd', sort: -1 }); + + // Verify both queries get expected results. + snapshot = await storeLimitEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'd', sort: -1 }, + { k: 'a', sort: 0 } + ]); + expect(snapshot.metadata.hasPendingWrites).to.equal(true); + + snapshot = await storeLimitToLastEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'a', sort: 0 }, + { k: 'd', sort: -1 } + ]); + expect(snapshot.metadata.hasPendingWrites).to.equal(true); + + // Un-listen to limitToLast, update a doc, then re-listen limitToLast. + limitToLastUnlisten(); + + await updateDoc(doc(coll, 'a'), { k: 'a', sort: -2 }); + limitToLastUnlisten = onSnapshot( + query(coll, orderBy('sort', 'desc'), limitToLast(2)), + { source: 'cache' }, + storeLimitToLastEvent.storeEvent + ); + + // Verify both queries get expected results. + snapshot = await storeLimitEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'a', sort: -2 }, + { k: 'd', sort: -1 } + ]); + expect(snapshot.metadata.hasPendingWrites).to.equal(true); + + snapshot = await storeLimitToLastEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'd', sort: -1 }, + { k: 'a', sort: -2 } + ]); + // We listened to LimitToLast query after the doc update. + expect(snapshot.metadata.hasPendingWrites).to.equal(false); + + limitUnlisten(); + limitToLastUnlisten(); + }); + }); + + it('can listen to default source first and then cache', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + // Listen to the query with default options, which will also populates the cache + const storeDefaultEvent = new EventsAccumulator(); + const testQuery = query( + coll, + where('sort', '>=', 1), + orderBy('sort', 'asc') + ); + + const defaultUnlisten = onSnapshot( + testQuery, + storeDefaultEvent.storeEvent + ); + let snapshot = await storeDefaultEvent.awaitRemoteEvent(); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'b', sort: 1 }]); + expect(snapshot.metadata.fromCache).to.equal(false); + + // Listen to the same query from cache + const storeCacheEvent = new EventsAccumulator(); + const cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'b', sort: 1 }]); + // The metadata is sync with server due to the default listener + expect(snapshot.metadata.fromCache).to.equal(false); + + await storeDefaultEvent.assertNoAdditionalEvents(); + await storeCacheEvent.assertNoAdditionalEvents(); + + defaultUnlisten(); + cacheUnlisten(); + }); + }); + + it('can listen to cache source first and then default', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + // Listen to the cache + const storeCacheEvent = new EventsAccumulator(); + const testQuery = query( + coll, + where('sort', '!=', 0), + orderBy('sort', 'asc') + ); + + const cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + let snapshot = await storeCacheEvent.awaitEvent(); + // Cache is empty + expect(toDataArray(snapshot)).to.deep.equal([]); + expect(snapshot.metadata.fromCache).to.equal(true); + + // Listen to the same query from server + const storeDefaultEvent = new EventsAccumulator(); + const defaultUnlisten = onSnapshot( + testQuery, + storeDefaultEvent.storeEvent + ); + snapshot = await storeDefaultEvent.awaitEvent(); + const expectedData = [{ k: 'b', sort: 1 }]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + expect(snapshot.metadata.fromCache).to.equal(false); + + // Default listener updates the cache, which triggers cache listener to raise snapshot. + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + // The metadata is sync with server due to the default listener + expect(snapshot.metadata.fromCache).to.equal(false); + + await storeDefaultEvent.assertNoAdditionalEvents(); + await storeCacheEvent.assertNoAdditionalEvents(); + + defaultUnlisten(); + cacheUnlisten(); + }); + }); + + it('will not get metadata only updates if listening to cache only', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + await getDocs(coll); // Populate the cache. + const testQuery = query( + coll, + where('sort', '!=', 0), + orderBy('sort', 'asc') + ); + + const storeEvent = new EventsAccumulator(); + const unsubscribe = onSnapshot( + testQuery, + { includeMetadataChanges: true, source: 'cache' }, + storeEvent.storeEvent + ); + + let snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.equal(true); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'b', sort: 1 }]); + + await addDoc(coll, { k: 'c', sort: 2 }); + + snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.hasPendingWrites).to.equal(true); + expect(snapshot.metadata.fromCache).to.equal(true); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'b', sort: 1 }, + { k: 'c', sort: 2 } + ]); + + // As we are not listening to server, the listener will not get notified + // when local mutation is acknowledged by server. + await storeEvent.assertNoAdditionalEvents(); + unsubscribe(); + }); + }); + + it('will have synced metadata updates when listening to both cache and default source', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + await getDocs(coll); // Populate the cache. + const testQuery = query( + coll, + where('sort', '!=', 0), + orderBy('sort', 'asc') + ); + + // Listen to the query from cache + const storeCacheEvent = new EventsAccumulator(); + const cacheUnlisten = onSnapshot( + testQuery, + { includeMetadataChanges: true, source: 'cache' }, + storeCacheEvent.storeEvent + ); + let snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'b', sort: 1 }]); + expect(snapshot.metadata.fromCache).to.equal(true); + + // Listen to the same query from server + const storeDefaultEvent = new EventsAccumulator(); + const defaultUnlisten = onSnapshot( + testQuery, + { includeMetadataChanges: true }, + storeDefaultEvent.storeEvent + ); + snapshot = await storeDefaultEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'b', sort: 1 }]); + // First snapshot will be raised from cache. + expect(snapshot.metadata.fromCache).to.equal(true); + snapshot = await storeDefaultEvent.awaitEvent(); + // Second snapshot will be raised from server result + expect(snapshot.metadata.fromCache).to.equal(false); + + // As listening to metadata changes, the cache listener also gets triggered and synced + // with default listener. + snapshot = await storeCacheEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.equal(false); + + await addDoc(coll, { k: 'c', sort: 2 }); + + // snapshot gets triggered by local mutation + snapshot = await storeDefaultEvent.awaitEvent(); + const expectedData = [ + { k: 'b', sort: 1 }, + { k: 'c', sort: 2 } + ]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + expect(snapshot.metadata.hasPendingWrites).to.equal(true); + expect(snapshot.metadata.fromCache).to.equal(false); + + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + expect(snapshot.metadata.hasPendingWrites).to.equal(true); + expect(snapshot.metadata.fromCache).to.equal(false); + + // Local mutation gets acknowledged by the server + snapshot = await storeDefaultEvent.awaitEvent(); + expect(snapshot.metadata.hasPendingWrites).to.equal(false); + expect(snapshot.metadata.fromCache).to.equal(false); + + snapshot = await storeCacheEvent.awaitEvent(); + expect(snapshot.metadata.hasPendingWrites).to.equal(false); + expect(snapshot.metadata.fromCache).to.equal(false); + + cacheUnlisten(); + defaultUnlisten(); + }); + }); + + it('can un-listen to default source while still listening to cache', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + const testQuery = query( + coll, + where('sort', '!=', 0), + orderBy('sort', 'asc') + ); + + // Listen to the query with both source options + const storeDefaultEvent = new EventsAccumulator(); + const defaultUnlisten = onSnapshot( + testQuery, + storeDefaultEvent.storeEvent + ); + await storeDefaultEvent.awaitEvent(); + const storeCacheEvent = new EventsAccumulator(); + const cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + await storeCacheEvent.awaitEvent(); + + // Un-listen to the default listener. + defaultUnlisten(); + await storeDefaultEvent.assertNoAdditionalEvents(); + + // Add a document and verify listener to cache works as expected + await addDoc(coll, { k: 'c', sort: -1 }); + + const snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'c', sort: -1 }, + { k: 'b', sort: 1 } + ]); + + await storeCacheEvent.assertNoAdditionalEvents(); + cacheUnlisten(); + }); + }); + + it('can un-listen to cache while still listening to server', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + const testQuery = query( + coll, + where('sort', '!=', 0), + orderBy('sort', 'asc') + ); + + // Listen to the query with both source options + const storeDefaultEvent = new EventsAccumulator(); + const defaultUnlisten = onSnapshot( + testQuery, + storeDefaultEvent.storeEvent + ); + await storeDefaultEvent.awaitEvent(); + const storeCacheEvent = new EventsAccumulator(); + const cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + await storeCacheEvent.awaitEvent(); + + // Un-listen to cache. + cacheUnlisten(); + await storeCacheEvent.assertNoAdditionalEvents(); + + // Add a document and verify listener to server works as expected. + await addDoc(coll, { k: 'c', sort: -1 }); + + const snapshot = await storeDefaultEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([ + { k: 'c', sort: -1 }, + { k: 'b', sort: 1 } + ]); + + await storeDefaultEvent.assertNoAdditionalEvents(); + defaultUnlisten(); + }); + }); + + it('can listen/un-listen/re-listen to same query with different source options', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + const testQuery = query( + coll, + where('sort', '>', 0), + orderBy('sort', 'asc') + ); + + // Listen to the query with default options, which also populates the cache + const storeDefaultEvent = new EventsAccumulator(); + let defaultUnlisten = onSnapshot( + testQuery, + storeDefaultEvent.storeEvent + ); + let snapshot = await storeDefaultEvent.awaitEvent(); + let expectedData = [{ k: 'b', sort: 1 }]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + // Listen to the same query from cache + const storeCacheEvent = new EventsAccumulator(); + let cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + // Un-listen to the default listener, add a doc and re-listen. + defaultUnlisten(); + await addDoc(coll, { k: 'c', sort: 2 }); + + snapshot = await storeCacheEvent.awaitEvent(); + expectedData = [ + { k: 'b', sort: 1 }, + { k: 'c', sort: 2 } + ]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + defaultUnlisten = onSnapshot(testQuery, storeDefaultEvent.storeEvent); + snapshot = await storeDefaultEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + // Un-listen to cache, update a doc, then re-listen to cache. + cacheUnlisten(); + await updateDoc(doc(coll, 'b'), { k: 'b', sort: 3 }); + + snapshot = await storeDefaultEvent.awaitEvent(); + expectedData = [ + { k: 'c', sort: 2 }, + { k: 'b', sort: 3 } + ]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + defaultUnlisten(); + cacheUnlisten(); + }); + }); + + it('can listen to composite index queries from cache', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async coll => { + await getDocs(coll); // Populate the cache. + + const testQuery = query( + coll, + where('k', '<=', 'a'), + where('sort', '>=', 0) + ); + const storeEvent = new EventsAccumulator(); + const unsubscribe = onSnapshot( + testQuery, + { source: 'cache' }, + storeEvent.storeEvent + ); + + const snapshot = await storeEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([{ k: 'a', sort: 0 }]); + unsubscribe(); + }); + }); + + it('can raise initial snapshot from cache, even if it is empty', () => { + return withTestCollection(persistence, {}, async coll => { + let snapshot = await getDocs(coll); // Populate the cache. + expect(toDataArray(snapshot)).to.deep.equal([]); // Precondition check. + + const storeEvent = new EventsAccumulator(); + onSnapshot(coll, { source: 'cache' }, storeEvent.storeEvent); + snapshot = await storeEvent.awaitEvent(); + expect(snapshot.metadata.fromCache).to.be.true; + expect(toDataArray(snapshot)).to.deep.equal([]); + }); + }); + + it('will not be triggered by transactions while listening to cache', () => { + return withTestCollection(persistence, {}, async (coll, db) => { + const accumulator = new EventsAccumulator(); + const unsubscribe = onSnapshot( + coll, + { source: 'cache' }, + accumulator.storeEvent + ); + + const snapshot = await accumulator.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal([]); + + const docRef = doc(coll); + // Use a transaction to perform a write without triggering any local events. + await runTransaction(db, async txn => { + txn.set(docRef, { k: 'a' }); + }); + + // There should be no events raised + await accumulator.assertNoAdditionalEvents(); + unsubscribe(); + }); + }); + + it('share server side updates when listening to both cache and default', () => { + const testDocs = { + a: { k: 'a', sort: 0 }, + b: { k: 'b', sort: 1 } + }; + return withTestCollection(persistence, testDocs, async (coll, db) => { + const testQuery = query( + coll, + where('sort', '>', 0), + orderBy('sort', 'asc') + ); + + // Listen to the query with default options, which will also populates the cache + const storeDefaultEvent = new EventsAccumulator(); + const defaultUnlisten = onSnapshot( + testQuery, + storeDefaultEvent.storeEvent + ); + let snapshot = await storeDefaultEvent.awaitRemoteEvent(); + let expectedData = [{ k: 'b', sort: 1 }]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + // Listen to the same query from cache + const storeCacheEvent = new EventsAccumulator(); + const cacheUnlisten = onSnapshot( + testQuery, + { source: 'cache' }, + storeCacheEvent.storeEvent + ); + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + + // Use a transaction to mock server side updates + const docRef = doc(coll); + await runTransaction(db, async txn => { + txn.set(docRef, { k: 'c', sort: 2 }); + }); + + // Default listener receives the server update + snapshot = await storeDefaultEvent.awaitRemoteEvent(); + expectedData = [ + { k: 'b', sort: 1 }, + { k: 'c', sort: 2 } + ]; + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + expect(snapshot.metadata.fromCache).to.be.false; + + // Cache listener raises snapshot as well + snapshot = await storeCacheEvent.awaitEvent(); + expect(toDataArray(snapshot)).to.deep.equal(expectedData); + expect(snapshot.metadata.fromCache).to.be.false; + + defaultUnlisten(); + cacheUnlisten(); + }); + }); + } + ); +}); diff --git a/packages/firestore/test/unit/core/event_manager.test.ts b/packages/firestore/test/unit/core/event_manager.test.ts index 293514ceb6d..e6752437ac5 100644 --- a/packages/firestore/test/unit/core/event_manager.test.ts +++ b/packages/firestore/test/unit/core/event_manager.test.ts @@ -25,7 +25,8 @@ import { newEventManager, eventManagerOnWatchChange, QueryListener, - eventManagerOnOnlineStateChange + eventManagerOnOnlineStateChange, + EventManager } from '../../../src/core/event_manager'; import { Query } from '../../../src/core/query'; import { OnlineState } from '../../../src/core/types'; @@ -50,29 +51,45 @@ describe('EventManager', () => { function fakeQueryListener(query: Query): any { return { query, + options: {}, onViewSnapshot: () => {}, onError: () => {}, - applyOnlineStateChange: () => {} + applyOnlineStateChange: () => {}, + listensToRemoteStore: () => {} }; } // mock objects. - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let onListenSpy: any, onUnlistenSpy: any; + /* eslint-disable @typescript-eslint/no-explicit-any */ + let onListenSpy: any, + onUnlistenSpy: any, + onFirstRemoteStoreListenSpy: any, + onLastRemoteStoreUnlistenSpy: any; + /* eslint-enable @typescript-eslint/no-explicit-any */ beforeEach(() => { onListenSpy = sinon.stub().returns(Promise.resolve(0)); onUnlistenSpy = sinon.spy(); + onFirstRemoteStoreListenSpy = sinon.spy(); + onLastRemoteStoreUnlistenSpy = sinon.spy(); }); + function eventManagerBindSpy(eventManager: EventManager): void { + eventManager.onListen = onListenSpy.bind(null); + eventManager.onUnlisten = onUnlistenSpy.bind(null); + eventManager.onFirstRemoteStoreListen = + onFirstRemoteStoreListenSpy.bind(null); + eventManager.onLastRemoteStoreUnlisten = + onLastRemoteStoreUnlistenSpy.bind(null); + } + it('handles many listenables per query', async () => { const query1 = query('foo/bar'); const fakeListener1 = fakeQueryListener(query1); const fakeListener2 = fakeQueryListener(query1); const eventManager = newEventManager(); - eventManager.onListen = onListenSpy.bind(null); - eventManager.onUnlisten = onUnlistenSpy.bind(null); + eventManagerBindSpy(eventManager); await eventManagerListen(eventManager, fakeListener1); expect(onListenSpy.calledWith(query1)).to.be.true; @@ -92,8 +109,7 @@ describe('EventManager', () => { const fakeListener1 = fakeQueryListener(query1); const eventManager = newEventManager(); - eventManager.onListen = onListenSpy.bind(null); - eventManager.onUnlisten = onUnlistenSpy.bind(null); + eventManagerBindSpy(eventManager); await eventManagerUnlisten(eventManager, fakeListener1); expect(onUnlistenSpy.callCount).to.equal(0); @@ -118,8 +134,7 @@ describe('EventManager', () => { }; const eventManager = newEventManager(); - eventManager.onListen = onListenSpy.bind(null); - eventManager.onUnlisten = onUnlistenSpy.bind(null); + eventManagerBindSpy(eventManager); await eventManagerListen(eventManager, fakeListener1); await eventManagerListen(eventManager, fakeListener2); @@ -150,8 +165,7 @@ describe('EventManager', () => { }; const eventManager = newEventManager(); - eventManager.onListen = onListenSpy.bind(null); - eventManager.onUnlisten = onUnlistenSpy.bind(null); + eventManagerBindSpy(eventManager); await eventManagerListen(eventManager, fakeListener1); expect(events).to.deep.equal([OnlineState.Unknown]); diff --git a/packages/firestore/test/unit/specs/bundle_spec.test.ts b/packages/firestore/test/unit/specs/bundle_spec.test.ts index 2e63799d80d..5a88dc8691c 100644 --- a/packages/firestore/test/unit/specs/bundle_spec.test.ts +++ b/packages/firestore/test/unit/specs/bundle_spec.test.ts @@ -52,7 +52,7 @@ interface TestBundledQuery { limitType?: LimitType; } -function bundleWithDocumentAndQuery( +export function bundleWithDocumentAndQuery( testDoc: TestBundleDocument, testQuery?: TestBundledQuery ): string { diff --git a/packages/firestore/test/unit/specs/listen_source_spec.test.ts b/packages/firestore/test/unit/specs/listen_source_spec.test.ts new file mode 100644 index 00000000000..ee2a9cf5944 --- /dev/null +++ b/packages/firestore/test/unit/specs/listen_source_spec.test.ts @@ -0,0 +1,843 @@ +/** + * @license + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { LimitType, queryWithLimit } from '../../../src/core/query'; +import { doc, filter, orderBy, query } from '../../util/helpers'; + +import { bundleWithDocumentAndQuery } from './bundle_spec.test'; +import { describeSpec, specTest } from './describe_spec'; +import { client, spec } from './spec_builder'; + +describeSpec('Listens source options:', [], () => { + specTest( + 'Contents of query are cleared when listen is removed.', + ['eager-gc'], + 'Explicitly tests eager GC behavior', + () => { + const testQuery = query('collection'); + const docA = doc('collection/a', 0, { key: 'a' }).setHasLocalMutations(); + return ( + spec() + .userSets('collection/a', { key: 'a' }) + .userListensToCache(testQuery) + .expectEvents(testQuery, { + added: [docA], + hasPendingWrites: true, + fromCache: true + }) + .userUnlistensToCache(testQuery) + .writeAcks('collection/a', 1000) + // Cache is empty as docA is GCed. + .userListensToCache(testQuery) + .expectEvents(testQuery, { added: [], fromCache: true }) + ); + } + ); + + specTest( + 'Documents are cleared when listen is removed.', + ['eager-gc'], + () => { + const filteredQuery = query('collection', filter('matches', '==', true)); + const unfilteredQuery = query('collection'); + const docA = doc('collection/a', 0, { + matches: true + }).setHasLocalMutations(); + const docB = doc('collection/b', 0, { + matches: true + }).setHasLocalMutations(); + return ( + spec() + .userSets('collection/a', { matches: true }) + .userSets('collection/b', { matches: true }) + .userListensToCache(filteredQuery) + .expectEvents(filteredQuery, { + added: [docA, docB], + hasPendingWrites: true, + fromCache: true + }) + .writeAcks('collection/a', 1000) + .writeAcks('collection/b', 2000) + .userSets('collection/b', { matches: false }) + // DocB doesn't match because of a pending mutation + .expectEvents(filteredQuery, { + removed: [docB], + hasPendingWrites: true, + fromCache: true + }) + .userUnlistensToCache(filteredQuery) + .writeAcks('collection/b', 3000) + // Should get no events since documents were GCed + .userListensToCache(unfilteredQuery) + .expectEvents(unfilteredQuery, { added: [], fromCache: true }) + .userUnlistensToCache(unfilteredQuery) + ); + } + ); + + specTest("Doesn't include unknown documents in cached result", [], () => { + const testQuery = query('collection'); + const existingDoc = doc('collection/exists', 0, { + key: 'a' + }).setHasLocalMutations(); + return spec() + .userSets('collection/exists', { key: 'a' }) + .userPatches('collection/unknown', { key: 'b' }) + .userListensToCache(testQuery) + .expectEvents(testQuery, { + added: [existingDoc], + fromCache: true, + hasPendingWrites: true + }); + }); + + specTest("Doesn't raise 'hasPendingWrites' for deletes", [], () => { + const testQuery = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(testQuery) + .watchAcksFull(testQuery, 1000, docA) + .expectEvents(testQuery, { added: [docA] }) + .userUnlistens(testQuery) + .watchRemoves(testQuery) + // Listen to cache + .userListensToCache(testQuery) + .expectEvents(testQuery, { added: [docA], fromCache: true }) + .userDeletes('collection/a') + .expectEvents(testQuery, { removed: [docA], fromCache: true }) + .writeAcks('collection/a', 2000) + ); + }); + + specTest('onSnapshotsInSync fires for multiple listeners', [], () => { + const query1 = query('collection'); + const docAv1 = doc('collection/a', 1000, { v: 1 }); + const docAv2Local = doc('collection/a', 1000, { + v: 2 + }).setHasLocalMutations(); + const docAv3Local = doc('collection/a', 1000, { + v: 3 + }).setHasLocalMutations(); + const docAv4Local = doc('collection/a', 1000, { + v: 4 + }).setHasLocalMutations(); + + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(query1) + .watchAcksFull(query1, 1000, docAv1) + .expectEvents(query1, { added: [docAv1] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to cache + .userListensToCache(query1) + .expectEvents(query1, { added: [docAv1], fromCache: true }) + .userAddsSnapshotsInSyncListener() + .expectSnapshotsInSyncEvent() + .userSets('collection/a', { v: 2 }) + .expectEvents(query1, { + hasPendingWrites: true, + modified: [docAv2Local], + fromCache: true + }) + .expectSnapshotsInSyncEvent() + .userAddsSnapshotsInSyncListener() + .expectSnapshotsInSyncEvent() + .userAddsSnapshotsInSyncListener() + .expectSnapshotsInSyncEvent() + .userSets('collection/a', { v: 3 }) + .expectEvents(query1, { + hasPendingWrites: true, + modified: [docAv3Local], + fromCache: true + }) + .expectSnapshotsInSyncEvent(3) + .userRemovesSnapshotsInSyncListener() + .userSets('collection/a', { v: 4 }) + .expectEvents(query1, { + hasPendingWrites: true, + modified: [docAv4Local], + fromCache: true + }) + .expectSnapshotsInSyncEvent(2) + ); + }); + + specTest('Empty initial snapshot is raised from cache', [], () => { + const query1 = query('collection'); + return ( + spec() + .ensureManualLruGC() + // Populate the cache with the empty query results. + .userListens(query1) + .watchAcksFull(query1, 1000) + .expectEvents(query1, { fromCache: false }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to the query again and verify that the empty snapshot is + // raised from cache. + .userListensToCache(query1, { resumeToken: 'resume-token-1000' }) + .expectEvents(query1, { fromCache: true }) + ); + }); + + specTest( + 'Empty-due-to-delete initial snapshot is raised from cache', + [], + () => { + const query1 = query('collection'); + const doc1 = doc('collection/a', 1000, { v: 1 }); + return ( + spec() + .ensureManualLruGC() + // Populate the cache with the empty query results. + .userListens(query1) + .watchAcksFull(query1, 1000, doc1) + .expectEvents(query1, { added: [doc1] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Delete the only document in the result set locally on the client. + .userDeletes('collection/a') + // Listen to the query again and verify that the empty snapshot is + // raised from cache, even though the write is not yet acknowledged. + .userListensToCache(query1, { resumeToken: 'resume-token-1000' }) + .expectEvents(query1, { fromCache: true }) + ); + } + ); + + specTest('Newer docs from bundles should overwrite cache', [], () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { value: 'a' }); + const docAChanged = doc('collection/a', 2999, { value: 'b' }); + + const bundleString = bundleWithDocumentAndQuery({ + key: docA.key, + readTime: 3000, + createTime: 1999, + updateTime: 2999, + content: { value: 'b' } + }); + + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to cache + .userListensToCache(query1) + .expectEvents(query1, { added: [docA], fromCache: true }) + .loadBundle(bundleString) + .expectEvents(query1, { modified: [docAChanged], fromCache: true }) + ); + }); + + specTest( + 'Newer deleted docs from bundles should delete cached docs', + [], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { value: 'a' }); + const bundleString = bundleWithDocumentAndQuery({ + key: docA.key, + readTime: 3000 + }); + + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to cache + .userListensToCache(query1) + .expectEvents(query1, { added: [docA], fromCache: true }) + .loadBundle(bundleString) + .expectEvents(query1, { removed: [docA], fromCache: true }) + ); + } + ); + + specTest('Older deleted docs from bundles should do nothing', [], () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { value: 'a' }); + const bundleString = bundleWithDocumentAndQuery({ + key: docA.key, + readTime: 999 + }); + + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to cache + .userListensToCache(query1) + .expectEvents(query1, { added: [docA], fromCache: true }) + // No events are expected here. + .loadBundle(bundleString) + ); + }); + + specTest( + 'Newer docs from bundles should keep not raise snapshot if there are unacknowledged writes', + [], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 250, { value: 'a' }); + const bundleString = bundleWithDocumentAndQuery({ + key: docA.key, + readTime: 1001, + createTime: 250, + updateTime: 1001, + content: { value: 'fromBundle' } + }); + + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(query1) + .watchAcksFull(query1, 250, docA) + .expectEvents(query1, { added: [docA] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to cache + .userListensToCache(query1) + .expectEvents(query1, { + added: [doc('collection/a', 250, { value: 'a' })], + fromCache: true + }) + .userPatches('collection/a', { value: 'patched' }) + .expectEvents(query1, { + modified: [ + doc('collection/a', 250, { + value: 'patched' + }).setHasLocalMutations() + ], + hasPendingWrites: true, + fromCache: true + }) + // Loading the bundle will not raise snapshots, because the + // mutation has not been acknowledged. + .loadBundle(bundleString) + ); + } + ); + + specTest( + 'Newer docs from bundles should raise snapshot only when Watch catches up with acknowledged writes', + [], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 250, { value: 'a' }); + const bundleBeforeMutationAck = bundleWithDocumentAndQuery({ + key: docA.key, + readTime: 500, + createTime: 250, + updateTime: 500, + content: { value: 'b' } + }); + + const bundleAfterMutationAck = bundleWithDocumentAndQuery({ + key: docA.key, + readTime: 1001, + createTime: 250, + updateTime: 1001, + content: { value: 'fromBundle' } + }); + return ( + spec() + .ensureManualLruGC() + // Populate the cache first + .userListens(query1) + .watchAcksFull(query1, 250, docA) + .expectEvents(query1, { added: [docA] }) + .userUnlistens(query1) + .watchRemoves(query1) + // Listen to cache + .userListensToCache(query1) + .expectEvents(query1, { + added: [doc('collection/a', 250, { value: 'a' })], + fromCache: true + }) + .userPatches('collection/a', { value: 'patched' }) + .expectEvents(query1, { + modified: [ + doc('collection/a', 250, { + value: 'patched' + }).setHasLocalMutations() + ], + hasPendingWrites: true, + fromCache: true + }) + .writeAcks('collection/a', 1000) + // loading bundleBeforeMutationAck will not raise snapshots, because its + // snapshot version is older than the acknowledged mutation. + .loadBundle(bundleBeforeMutationAck) + // loading bundleAfterMutationAck will raise a snapshot, because it is after + // the acknowledged mutation. + .loadBundle(bundleAfterMutationAck) + .expectEvents(query1, { + modified: [doc('collection/a', 1001, { value: 'fromBundle' })], + fromCache: true + }) + ); + } + ); + + specTest( + 'Primary client should not invoke watch request while all clients are listening to cache', + ['multi-client'], + () => { + const query1 = query('collection'); + + return ( + client(0) + .becomeVisible() + .client(1) + .userListensToCache(query1) + .expectEvents(query1, { added: [], fromCache: true }) + // Primary client should not invoke watch request for cache listeners + .client(0) + .expectListenToCache(query1) + .expectActiveTargets() + .client(1) + .userUnlistensToCache(query1) + .client(0) + .expectUnlistenToCache(query1) + ); + } + ); + + specTest( + 'Local mutations notifies listeners sourced from cache in all tabs', + ['multi-client'], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 0, { + key: 'a' + }).setHasLocalMutations(); + + return client(0) + .becomeVisible() + .userListensToCache(query1) + .expectEvents(query1, { added: [], fromCache: true }) + .client(1) + .userListensToCache(query1) + .expectEvents(query1, { added: [], fromCache: true }) + .client(0) + .userSets('collection/a', { key: 'a' }) + .expectEvents(query1, { + hasPendingWrites: true, + added: [docA], + fromCache: true + }) + .client(1) + .expectEvents(query1, { + hasPendingWrites: true, + added: [docA], + fromCache: true + }); + } + ); + + specTest( + 'Listeners with different source shares watch changes between primary and secondary clients', + ['multi-client'], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docB = doc('collection/b', 2000, { key: 'a' }); + + return ( + client(0) + .becomeVisible() + // Listen to server in the primary client + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + // Listen to cache in secondary clients + .client(1) + .userListensToCache(query1) + .expectEvents(query1, { added: [docA], fromCache: true }) + .client(2) + .userListensToCache(query1) + .expectEvents(query1, { added: [docA], fromCache: true }) + // Updates in the primary client notifies listeners sourcing from cache + // in secondary clients. + .client(0) + .watchSends({ affects: [query1] }, docB) + .watchSnapshots(2000) + .expectEvents(query1, { added: [docB] }) + .client(1) + .expectEvents(query1, { added: [docB] }) + .client(2) + .expectEvents(query1, { added: [docB] }) + // Un-listen to the server in the primary tab. + .client(0) + .userUnlistens(query1) + // There should be no active watch targets left. + .expectActiveTargets() + ); + } + ); + + specTest( + 'Clients can have multiple listeners with different sources', + ['multi-client'], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docB = doc('collection/b', 2000, { key: 'a' }); + + return ( + client(0) + .becomeVisible() + // Listen to both server and cache in the primary client + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + .userListensToCache(query1) + .expectEvents(query1, { added: [docA] }) + // Listen to both server and cache in the secondary client + .client(1) + .userListens(query1) + .expectEvents(query1, { added: [docA] }) + .userListensToCache(query1) + .expectEvents(query1, { added: [docA] }) + // Updates in the primary client notifies all listeners + .client(0) + .watchSends({ affects: [query1] }, docB) + .watchSnapshots(2000) + .expectEvents(query1, { added: [docB] }) + .expectEvents(query1, { added: [docB] }) + .client(1) + .expectEvents(query1, { added: [docB] }) + .expectEvents(query1, { added: [docB] }) + ); + } + ); + + specTest( + 'Query is executed by primary client even if primary client only has listeners sourced from cache', + ['multi-client'], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return ( + client(0) + .becomeVisible() + // Listen to cache in primary client + .userListensToCache(query1) + .expectEvents(query1, { added: [], fromCache: true }) + // Listen to server in secondary client + .client(1) + .userListens(query1) + // The query is executed in the primary client + .client(0) + .expectListen(query1) + // Updates in the primary client notifies both listeners + .watchAcks(query1) + .watchSends({ affects: [query1] }, docA) + .watchSnapshots(1000) + .expectEvents(query1, { added: [docA], fromCache: true }) + .client(1) + .expectEvents(query1, { added: [docA], fromCache: true }) + .client(0) + .watchCurrents(query1, 'resume-token-2000') + .watchSnapshots(2000) + // Listeners in both tabs are in sync + .expectEvents(query1, { fromCache: false }) + .client(1) + .expectEvents(query1, { fromCache: false }) + ); + } + ); + + specTest( + 'Query only raises events in participating clients', + ['multi-client'], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + + return client(0) + .becomeVisible() + .client(1) + .client(2) + .userListensToCache(query1) + .expectEvents(query1, { added: [], fromCache: true }) + .client(3) + .userListens(query1) + .client(0) // No events + .expectListen(query1) + .watchAcksFull(query1, 1000, docA) + .client(1) // No events + .client(2) + .expectEvents(query1, { added: [docA] }) + .client(3) + .expectEvents(query1, { added: [docA] }); + } + ); + + specTest( + 'Mirror queries being listened in different clients sourced from cache ', + ['multi-client'], + () => { + const fullQuery = query('collection'); + const limit = queryWithLimit( + query('collection', orderBy('sort', 'asc')), + 2, + LimitType.First + ); + const limitToLast = queryWithLimit( + query('collection', orderBy('sort', 'desc')), + 2, + LimitType.Last + ); + const docA = doc('collection/a', 1000, { sort: 0 }); + const docB = doc('collection/b', 1000, { sort: 1 }); + const docC = doc('collection/c', 1000, { sort: 1 }); + const docCV2 = doc('collection/c', 2000, { + sort: -1 + }).setHasLocalMutations(); + + return ( + client(0) + .becomeVisible() + // Populate the cache first + .userListens(fullQuery) + .watchAcksFull(fullQuery, 1000, docA, docB, docC) + .expectEvents(fullQuery, { added: [docA, docB, docC] }) + .userUnlistens(fullQuery) + .watchRemoves(fullQuery) + + // Listen to mirror queries from cache in 2 different tabs + .userListensToCache(limit) + .expectEvents(limit, { added: [docA, docB], fromCache: true }) + .client(1) + .userListensToCache(limitToLast) + .expectEvents(limitToLast, { added: [docB, docA], fromCache: true }) + // Un-listen to the query in primary tab and do a local mutation + .client(0) + .userUnlistensToCache(limit) + .userSets('collection/c', { sort: -1 }) + // Listener in the other tab should work as expected + .client(1) + .expectEvents(limitToLast, { + hasPendingWrites: true, + added: [docCV2], + removed: [docB], + fromCache: true + }) + ); + } + ); + + specTest( + 'Mirror queries being listened in the same secondary client sourced from cache', + ['multi-client'], + () => { + const fullQuery = query('collection'); + const limit = queryWithLimit( + query('collection', orderBy('sort', 'asc')), + 2, + LimitType.First + ); + const limitToLast = queryWithLimit( + query('collection', orderBy('sort', 'desc')), + 2, + LimitType.Last + ); + const docA = doc('collection/a', 1000, { sort: 0 }); + const docB = doc('collection/b', 1000, { sort: 1 }); + const docC = doc('collection/c', 1000, { sort: 1 }); + const docCV2 = doc('collection/c', 2000, { + sort: -1 + }).setHasLocalMutations(); + + return ( + client(0) + .becomeVisible() + // Populate the cache first + .userListens(fullQuery) + .watchAcksFull(fullQuery, 1000, docA, docB, docC) + .expectEvents(fullQuery, { added: [docA, docB, docC] }) + .userUnlistens(fullQuery) + .watchRemoves(fullQuery) + + // Listen to mirror queries in a secondary tab + .client(1) + .userListensToCache(limit) + .expectEvents(limit, { added: [docA, docB], fromCache: true }) + .userListensToCache(limitToLast) + .expectEvents(limitToLast, { added: [docB, docA], fromCache: true }) + // Un-listen to one of the query and do a local mutation + .userUnlistensToCache(limit) + .userSets('collection/c', { sort: -1 }) + // The other listener should work as expected + .expectEvents(limitToLast, { + hasPendingWrites: true, + added: [docCV2], + removed: [docB], + fromCache: true + }) + ); + } + ); + + specTest( + 'Mirror queries being listened from different sources while listening to server in primary tab', + ['multi-client'], + () => { + const limit = queryWithLimit( + query('collection', orderBy('sort', 'asc')), + 2, + LimitType.First + ); + const limitToLast = queryWithLimit( + query('collection', orderBy('sort', 'desc')), + 2, + LimitType.Last + ); + const docA = doc('collection/a', 1000, { sort: 0 }); + const docB = doc('collection/b', 1000, { sort: 1 }); + const docC = doc('collection/c', 2000, { sort: -1 }); + + return ( + // Listen to server in primary client + client(0) + .becomeVisible() + .userListens(limit) + .expectListen(limit) + .watchAcksFull(limit, 1000, docA, docB) + .expectEvents(limit, { added: [docA, docB] }) + //Listen to cache in secondary client + .client(1) + .userListensToCache(limitToLast) + .expectEvents(limitToLast, { added: [docB, docA], fromCache: true }) + // Watch sends document changes + .client(0) + .watchSends({ affects: [limit] }, docC) + .watchSnapshots(2000) + .expectEvents(limit, { added: [docC], removed: [docB] }) + // Cache listener gets notified as well. + .client(1) + .expectEvents(limitToLast, { added: [docC], removed: [docB] }) + ); + } + ); + + specTest( + 'Mirror queries from different sources while listening to server in secondary tab', + ['multi-client'], + () => { + const limit = queryWithLimit( + query('collection', orderBy('sort', 'asc')), + 2, + LimitType.First + ); + const limitToLast = queryWithLimit( + query('collection', orderBy('sort', 'desc')), + 2, + LimitType.Last + ); + const docA = doc('collection/a', 1000, { sort: 0 }); + const docB = doc('collection/b', 1000, { sort: 1 }); + const docC = doc('collection/c', 2000, { sort: -1 }); + + return ( + client(0) + .becomeVisible() + // Listen to server in the secondary client + .client(1) + .userListens(limit) + .client(0) + .expectListen(limit) + .watchAcksFull(limit, 1000, docA, docB) + .client(1) + .expectEvents(limit, { added: [docA, docB] }) + + // Listen to cache in primary client + .client(0) + .userListensToCache(limitToLast) + .expectEvents(limitToLast, { added: [docB, docA], fromCache: true }) + // Watch sends document changes + .watchSends({ affects: [limit] }, docC) + .watchSnapshots(2000) + .expectEvents(limitToLast, { added: [docC], removed: [docB] }) + .client(1) + .expectEvents(limit, { added: [docC], removed: [docB] }) + ); + } + ); + + specTest( + 'Un-listen to listeners from different source', + ['multi-client'], + () => { + const query1 = query('collection'); + const docA = doc('collection/a', 1000, { key: 'a' }); + const docB = doc('collection/b', 1000, { + key: 'b' + }).setHasLocalMutations(); + + return ( + client(0) + .becomeVisible() + // Listen to server in primary client + .userListens(query1) + .watchAcksFull(query1, 1000, docA) + .expectEvents(query1, { added: [docA] }) + // Listen to cache in secondary client + .client(1) + .userListensToCache(query1) + .expectEvents(query1, { added: [docA], fromCache: true }) + .client(0) + .userUnlistens(query1) + .userSets('collection/b', { key: 'b' }) + // The other listener should work as expected + .client(1) + .expectEvents(query1, { + hasPendingWrites: true, + added: [docB], + fromCache: true + }) + .userUnlistensToCache(query1) + ); + } + ); +}); diff --git a/packages/firestore/test/unit/specs/spec_builder.ts b/packages/firestore/test/unit/specs/spec_builder.ts index 44dc623cfe1..d79cca9cd82 100644 --- a/packages/firestore/test/unit/specs/spec_builder.ts +++ b/packages/firestore/test/unit/specs/spec_builder.ts @@ -17,6 +17,10 @@ import { IndexConfiguration } from '../../../src/api/index_configuration'; import { ExpUserDataWriter } from '../../../src/api/reference_impl'; +import { + ListenOptions, + ListenerDataSource as Source +} from '../../../src/core/event_manager'; import { FieldFilter, Filter } from '../../../src/core/filter'; import { LimitType, @@ -266,7 +270,11 @@ export class SpecBuilder { return this; } - userListens(query: Query, resume?: ResumeSpec): this { + private addUserListenStep( + query: Query, + resume?: ResumeSpec, + options?: ListenOptions + ): void { this.nextStep(); const target = queryToTarget(query); @@ -275,7 +283,7 @@ export class SpecBuilder { if (this.injectFailures) { // Return a `userListens()` step but don't advance the target IDs. this.currentStep = { - userListen: { targetId, query: SpecBuilder.queryToSpec(query) } + userListen: { targetId, query: SpecBuilder.queryToSpec(query), options } }; } else { if (this.queryMapping.has(target)) { @@ -285,12 +293,30 @@ export class SpecBuilder { } this.queryMapping.set(target, targetId); - this.addQueryToActiveTargets(targetId, query, resume); + + if (options?.source !== Source.Cache) { + // Active targets are created if listener is not sourcing from cache + this.addQueryToActiveTargets(targetId, query, resume); + } + this.currentStep = { - userListen: { targetId, query: SpecBuilder.queryToSpec(query) }, + userListen: { + targetId, + query: SpecBuilder.queryToSpec(query), + options + }, expectedState: { activeTargets: { ...this.activeTargets } } }; } + } + + userListens(query: Query, resume?: ResumeSpec): this { + this.addUserListenStep(query, resume); + return this; + } + + userListensToCache(query: Query, resume?: ResumeSpec): this { + this.addUserListenStep(query, resume, { source: Source.Cache }); return this; } @@ -320,14 +346,16 @@ export class SpecBuilder { return this; } - userUnlistens(query: Query): this { + userUnlistens(query: Query, shouldRemoveWatchTarget: boolean = true): this { this.nextStep(); const target = queryToTarget(query); if (!this.queryMapping.has(target)) { throw new Error('Unlistening to query not listened to: ' + query); } const targetId = this.queryMapping.get(target)!; - this.removeQueryFromActiveTargets(query, targetId); + if (shouldRemoveWatchTarget) { + this.removeQueryFromActiveTargets(query, targetId); + } if (this.config.useEagerGCForMemory && !this.activeTargets[targetId]) { this.queryMapping.delete(target); @@ -341,6 +369,11 @@ export class SpecBuilder { return this; } + userUnlistensToCache(query: Query): this { + // Listener sourced from cache do not need to close watch stream. + return this.userUnlistens(query, /** shouldRemoveWatchTarget= */ false); + } + userSets(key: string, value: JsonObject): this { this.nextStep(); this.currentStep = { @@ -929,30 +962,46 @@ export class SpecBuilder { return this; } - /** Registers a query that is active in another tab. */ - expectListen(query: Query, resume?: ResumeSpec): this { + private registerQuery( + query: Query, + shouldAddWatchTarget: boolean, + resume?: ResumeSpec + ): this { this.assertStep('Expectations require previous step'); const target = queryToTarget(query); const targetId = this.queryIdGenerator.cachedId(target); this.queryMapping.set(target, targetId); - this.addQueryToActiveTargets(targetId, query, resume); - + if (shouldAddWatchTarget) { + this.addQueryToActiveTargets(targetId, query, resume); + } const currentStep = this.currentStep!; currentStep.expectedState = currentStep.expectedState || {}; currentStep.expectedState.activeTargets = { ...this.activeTargets }; return this; } - /** Removes a query that is no longer active in any tab. */ - expectUnlisten(query: Query): this { + /** Registers a query that is active in another tab. */ + expectListen(query: Query, resume?: ResumeSpec): this { + return this.registerQuery(query, true, resume); + } + + /** Registers a query that is listening to cache and active in another tab. */ + expectListenToCache(query: Query, resume?: ResumeSpec): this { + // Listeners that source from cache would not send watch request. + return this.registerQuery(query, false, resume); + } + + removeQuery(query: Query, shouldRemoveWatchTarget: boolean = true): this { this.assertStep('Expectations require previous step'); const target = queryToTarget(query); const targetId = this.queryMapping.get(target)!; - this.removeQueryFromActiveTargets(query, targetId); + if (shouldRemoveWatchTarget) { + this.removeQueryFromActiveTargets(query, targetId); + } if (this.config.useEagerGCForMemory && !this.activeTargets[targetId]) { this.queryMapping.delete(target); @@ -965,6 +1014,17 @@ export class SpecBuilder { return this; } + /** Removes a query that is no longer active in any tab. */ + expectUnlisten(query: Query): this { + return this.removeQuery(query); + } + + /** Removes a query that is listening to cache and no longer active in any tab. */ + expectUnlistenToCache(query: Query): this { + // Listeners that source from cache did not establish watch connection, so no active targets to remove. + return this.removeQuery(query, false); + } + /** * Verifies the total number of requests sent to the write backend since test * initialization. diff --git a/packages/firestore/test/unit/specs/spec_test_runner.ts b/packages/firestore/test/unit/specs/spec_test_runner.ts index 1245a1c0231..b34421d9e0a 100644 --- a/packages/firestore/test/unit/specs/spec_test_runner.ts +++ b/packages/firestore/test/unit/specs/spec_test_runner.ts @@ -37,7 +37,9 @@ import { Observer, QueryListener, removeSnapshotsInSyncListener, - addSnapshotsInSyncListener + addSnapshotsInSyncListener, + ListenOptions, + ListenerDataSource as Source } from '../../../src/core/event_manager'; import { canonifyQuery, @@ -59,7 +61,9 @@ import { syncEngineListen, syncEngineLoadBundle, syncEngineUnlisten, - syncEngineWrite + syncEngineWrite, + triggerRemoteStoreListen, + triggerRemoteStoreUnlisten } from '../../../src/core/sync_engine_impl'; import { TargetId } from '../../../src/core/types'; import { @@ -353,6 +357,13 @@ abstract class TestRunner { this.syncEngine ); + this.eventManager.onFirstRemoteStoreListen = triggerRemoteStoreListen.bind( + null, + this.syncEngine + ); + this.eventManager.onLastRemoteStoreUnlisten = + triggerRemoteStoreUnlisten.bind(null, this.syncEngine); + await this.persistence.setDatabaseDeletedListener(async () => { await this.shutdown(); }); @@ -482,11 +493,14 @@ abstract class TestRunner { } this.pushEvent(e); }); - // TODO(dimond): Allow customizing listen options in spec tests + const options = { - includeMetadataChanges: true, - waitForSyncWhenOnline: false + includeMetadataChanges: + listenSpec.options?.includeMetadataChanges ?? true, + waitForSyncWhenOnline: false, + source: listenSpec.options?.source ?? Source.Default }; + const queryListener = new QueryListener(query, aggregator, options); this.queryListeners.set(query, queryListener); @@ -509,8 +523,12 @@ abstract class TestRunner { ); } - if (this.isPrimaryClient && this.networkEnabled) { - // Open should always have happened after a listen + if ( + this.isPrimaryClient && + this.networkEnabled && + options.source !== Source.Cache + ) { + // Unless listened to cache, open always have happened after a listen. await this.connection.waitForWatchOpen(); } } @@ -1542,6 +1560,7 @@ export interface SpecStep { export interface SpecUserListen { targetId: TargetId; query: string | SpecQuery; + options?: ListenOptions; } /** [, ] */