Skip to content

Commit

Permalink
snapshot listeners source from cache (#7982)
Browse files Browse the repository at this point in the history
  • Loading branch information
milaGGL authored Mar 11, 2024
1 parent 6d487d7 commit ce88e71
Show file tree
Hide file tree
Showing 17 changed files with 2,055 additions and 81 deletions.
5 changes: 5 additions & 0 deletions .changeset/smart-games-cheer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@firebase/firestore': minor
'firebase': minor
---
Enable snapshot listener option to retrieve data from local cache only.
4 changes: 4 additions & 0 deletions common/api-review/firestore.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ export function limit(limit: number): QueryLimitConstraint;
// @public
export function limitToLast(limit: number): QueryLimitConstraint;

// @public
export type ListenSource = 'default' | 'cache';

// @public
export function loadBundle(firestore: Firestore, bundleData: ReadableStream<Uint8Array> | ArrayBuffer | string): LoadBundleTask;

Expand Down Expand Up @@ -651,6 +654,7 @@ export function snapshotEqual<AppModelType, DbModelType extends DocumentData>(le
// @public
export interface SnapshotListenOptions {
readonly includeMetadataChanges?: boolean;
readonly source?: ListenSource;
}

// @public
Expand Down
13 changes: 13 additions & 0 deletions docs-devsite/firestore_.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ https://github.com/firebase/firebase-js-sdk
| [DocumentChangeType](./firestore_.md#documentchangetype) | The type of a <code>DocumentChange</code> may be 'added', 'removed', or 'modified'. |
| [FirestoreErrorCode](./firestore_.md#firestoreerrorcode) | The set of Firestore status codes. The codes are the same at the ones exposed by gRPC here: https://github.com/grpc/grpc/blob/master/doc/statuscodes.md<!-- -->Possible values: - 'cancelled': The operation was cancelled (typically by the caller). - 'unknown': Unknown error or an error from a different error domain. - 'invalid-argument': Client specified an invalid argument. Note that this differs from 'failed-precondition'. 'invalid-argument' indicates arguments that are problematic regardless of the state of the system (e.g. an invalid field name). - 'deadline-exceeded': Deadline expired before operation could complete. For operations that change the state of the system, this error may be returned even if the operation has completed successfully. For example, a successful response from a server could have been delayed long enough for the deadline to expire. - 'not-found': Some requested document was not found. - 'already-exists': Some document that we attempted to create already exists. - 'permission-denied': The caller does not have permission to execute the specified operation. - 'resource-exhausted': Some resource has been exhausted, perhaps a per-user quota, or perhaps the entire file system is out of space. - 'failed-precondition': Operation was rejected because the system is not in a state required for the operation's execution. - 'aborted': The operation was aborted, typically due to a concurrency issue like transaction aborts, etc. - 'out-of-range': Operation was attempted past the valid range. - 'unimplemented': Operation is not implemented or not supported/enabled. - 'internal': Internal errors. Means some invariants expected by underlying system has been broken. If you see one of these errors, something is very broken. - 'unavailable': The service is currently unavailable. This is most likely a transient condition and may be corrected by retrying with a backoff. - 'data-loss': Unrecoverable data loss or corruption. - 'unauthenticated': The request does not have valid authentication credentials for the operation. |
| [FirestoreLocalCache](./firestore_.md#firestorelocalcache) | Union type from all supported SDK cache layer. |
| [ListenSource](./firestore_.md#listensource) | Describe the source a query listens to.<!-- -->Set to <code>default</code> to listen to both cache and server changes. Set to <code>cache</code> to listen to changes in cache only. |
| [MemoryGarbageCollector](./firestore_.md#memorygarbagecollector) | Union type from all support gabage collectors for memory local cache. |
| [NestedUpdateFields](./firestore_.md#nestedupdatefields) | For each field (e.g. 'bar'), find all nested keys (e.g. {<!-- -->'bar.baz': T1, 'bar.qux': T2<!-- -->}<!-- -->). Intersect them together to make a single map containing all possible keys that are all marked as optional |
| [OrderByDirection](./firestore_.md#orderbydirection) | The direction of a [orderBy()](./firestore_.md#orderby_006d61f) clause is specified as 'desc' or 'asc' (descending or ascending). |
Expand Down Expand Up @@ -2551,6 +2552,18 @@ Union type from all supported SDK cache layer.
export declare type FirestoreLocalCache = MemoryLocalCache | PersistentLocalCache;
```

## ListenSource

Describe the source a query listens to.

Set to `default` to listen to both cache and server changes. Set to `cache` to listen to changes in cache only.

<b>Signature:</b>

```typescript
export declare type ListenSource = 'default' | 'cache';
```

## MemoryGarbageCollector

Union type from all support gabage collectors for memory local cache.
Expand Down
11 changes: 11 additions & 0 deletions docs-devsite/firestore_.snapshotlistenoptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export declare interface SnapshotListenOptions
| Property | Type | Description |
| --- | --- | --- |
| [includeMetadataChanges](./firestore_.snapshotlistenoptions.md#snapshotlistenoptionsincludemetadatachanges) | boolean | Include a change even if only the metadata of the query or of a document changed. Default is false. |
| [source](./firestore_.snapshotlistenoptions.md#snapshotlistenoptionssource) | [ListenSource](./firestore_.md#listensource) | Set the source the query listens to. Default to "default", which listens to both cache and server. |

## SnapshotListenOptions.includeMetadataChanges

Expand All @@ -33,3 +34,13 @@ Include a change even if only the metadata of the query or of a document changed
```typescript
readonly includeMetadataChanges?: boolean;
```

## SnapshotListenOptions.source

Set the source the query listens to. Default to "default", which listens to both cache and server.

<b>Signature:</b>

```typescript
readonly source?: ListenSource;
```
6 changes: 5 additions & 1 deletion packages/firestore/src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,11 @@ export {
WhereFilterOp
} from './api/filter';

export { SnapshotListenOptions, Unsubscribe } from './api/reference_impl';
export {
ListenSource,
SnapshotListenOptions,
Unsubscribe
} from './api/reference_impl';

export { TransactionOptions } from './api/transaction_options';

Expand Down
21 changes: 19 additions & 2 deletions packages/firestore/src/api/reference_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
NextFn,
PartialObserver
} from '../api/observer';
import { ListenerDataSource } from '../core/event_manager';
import {
firestoreClientAddSnapshotsInSyncListener,
firestoreClientGetDocumentFromLocalCache,
Expand Down Expand Up @@ -78,8 +79,22 @@ export interface SnapshotListenOptions {
* changed. Default is false.
*/
readonly includeMetadataChanges?: boolean;

/**
* Set the source the query listens to. Default to "default", which
* listens to both cache and server.
*/
readonly source?: ListenSource;
}

/**
* Describe the source a query listens to.
*
* Set to `default` to listen to both cache and server changes. Set to `cache`
* to listen to changes in cache only.
*/
export type ListenSource = 'default' | 'cache';

/**
* Reads the document referred to by this `DocumentReference`.
*
Expand Down Expand Up @@ -668,7 +683,8 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
reference = getModularInstance(reference);

let options: SnapshotListenOptions = {
includeMetadataChanges: false
includeMetadataChanges: false,
source: 'default'
};
let currArg = 0;
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
Expand All @@ -677,7 +693,8 @@ export function onSnapshot<AppModelType, DbModelType extends DocumentData>(
}

const internalOptions = {
includeMetadataChanges: options.includeMetadataChanges
includeMetadataChanges: options.includeMetadataChanges,
source: options.source as ListenerDataSource
};

if (isPartialObserver(args[currArg])) {
Expand Down
168 changes: 144 additions & 24 deletions packages/firestore/src/core/event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
class QueryListenersInfo {
viewSnap: ViewSnapshot | undefined = undefined;
listeners: QueryListener[] = [];

// Helper methods that checks if the query has listeners that listening to remote store
hasRemoteListeners(): boolean {
return this.listeners.some(listener => listener.listensToRemoteStore());
}
}

/**
Expand All @@ -52,8 +57,13 @@ export interface Observer<T> {
* allows users to tree-shake the Watch logic.
*/
export interface EventManager {
onListen?: (query: Query) => Promise<ViewSnapshot>;
onUnlisten?: (query: Query) => Promise<void>;
onListen?: (
query: Query,
enableRemoteListen: boolean
) => Promise<ViewSnapshot>;
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
}

export function newEventManager(): EventManager {
Expand All @@ -71,38 +81,104 @@ export class EventManagerImpl implements EventManager {
snapshotsInSyncListeners: Set<Observer<void>> = new Set();

/** Callback invoked when a Query is first listen to. */
onListen?: (query: Query) => Promise<ViewSnapshot>;
onListen?: (
query: Query,
enableRemoteListen: boolean
) => Promise<ViewSnapshot>;
/** Callback invoked once all listeners to a Query are removed. */
onUnlisten?: (query: Query) => Promise<void>;
onUnlisten?: (query: Query, disableRemoteListen: boolean) => Promise<void>;

/**
* Callback invoked when a Query starts listening to the remote store, while
* already listening to the cache.
*/
onFirstRemoteStoreListen?: (query: Query) => Promise<void>;
/**
* Callback invoked when a Query stops listening to the remote store, while
* still listening to the cache.
*/
onLastRemoteStoreUnlisten?: (query: Query) => Promise<void>;
}

function validateEventManager(eventManagerImpl: EventManagerImpl): void {
debugAssert(!!eventManagerImpl.onListen, 'onListen not set');
debugAssert(
!!eventManagerImpl.onFirstRemoteStoreListen,
'onFirstRemoteStoreListen not set'
);
debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set');
debugAssert(
!!eventManagerImpl.onLastRemoteStoreUnlisten,
'onLastRemoteStoreUnlisten not set'
);
}

const enum ListenerSetupAction {
InitializeLocalListenAndRequireWatchConnection,
InitializeLocalListenOnly,
RequireWatchConnectionOnly,
NoActionRequired
}

const enum ListenerRemovalAction {
TerminateLocalListenAndRequireWatchDisconnection,
TerminateLocalListenOnly,
RequireWatchDisconnectionOnly,
NoActionRequired
}

export async function eventManagerListen(
eventManager: EventManager,
listener: QueryListener
): Promise<void> {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
validateEventManager(eventManagerImpl);

let listenerAction = ListenerSetupAction.NoActionRequired;

debugAssert(!!eventManagerImpl.onListen, 'onListen not set');
const query = listener.query;
let firstListen = false;

let queryInfo = eventManagerImpl.queries.get(query);
if (!queryInfo) {
firstListen = true;
queryInfo = new QueryListenersInfo();
listenerAction = listener.listensToRemoteStore()
? ListenerSetupAction.InitializeLocalListenAndRequireWatchConnection
: ListenerSetupAction.InitializeLocalListenOnly;
} else if (
!queryInfo.hasRemoteListeners() &&
listener.listensToRemoteStore()
) {
// Query has been listening to local cache, and tries to add a new listener sourced from watch.
listenerAction = ListenerSetupAction.RequireWatchConnectionOnly;
}

if (firstListen) {
try {
queryInfo.viewSnap = await eventManagerImpl.onListen(query);
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e as Error,
`Initialization of query '${stringifyQuery(listener.query)}' failed`
);
listener.onError(firestoreError);
return;
try {
switch (listenerAction) {
case ListenerSetupAction.InitializeLocalListenAndRequireWatchConnection:
queryInfo.viewSnap = await eventManagerImpl.onListen!(
query,
/** enableRemoteListen= */ true
);
break;
case ListenerSetupAction.InitializeLocalListenOnly:
queryInfo.viewSnap = await eventManagerImpl.onListen!(
query,
/** enableRemoteListen= */ false
);
break;
case ListenerSetupAction.RequireWatchConnectionOnly:
await eventManagerImpl.onFirstRemoteStoreListen!(query);
break;
default:
break;
}
} catch (e) {
const firestoreError = wrapInUserErrorIfRecoverable(
e as Error,
`Initialization of query '${stringifyQuery(listener.query)}' failed`
);
listener.onError(firestoreError);
return;
}

eventManagerImpl.queries.set(query, queryInfo);
Expand Down Expand Up @@ -130,23 +206,47 @@ export async function eventManagerUnlisten(
listener: QueryListener
): Promise<void> {
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
validateEventManager(eventManagerImpl);

debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set');
const query = listener.query;
let lastListen = false;
let listenerAction = ListenerRemovalAction.NoActionRequired;

const queryInfo = eventManagerImpl.queries.get(query);
if (queryInfo) {
const i = queryInfo.listeners.indexOf(listener);
if (i >= 0) {
queryInfo.listeners.splice(i, 1);
lastListen = queryInfo.listeners.length === 0;

if (queryInfo.listeners.length === 0) {
listenerAction = listener.listensToRemoteStore()
? ListenerRemovalAction.TerminateLocalListenAndRequireWatchDisconnection
: ListenerRemovalAction.TerminateLocalListenOnly;
} else if (
!queryInfo.hasRemoteListeners() &&
listener.listensToRemoteStore()
) {
// The removed listener is the last one that sourced from watch.
listenerAction = ListenerRemovalAction.RequireWatchDisconnectionOnly;
}
}
}

if (lastListen) {
eventManagerImpl.queries.delete(query);
return eventManagerImpl.onUnlisten(query);
switch (listenerAction) {
case ListenerRemovalAction.TerminateLocalListenAndRequireWatchDisconnection:
eventManagerImpl.queries.delete(query);
return eventManagerImpl.onUnlisten!(
query,
/** disableRemoteListen= */ true
);
case ListenerRemovalAction.TerminateLocalListenOnly:
eventManagerImpl.queries.delete(query);
return eventManagerImpl.onUnlisten!(
query,
/** disableRemoteListen= */ false
);
case ListenerRemovalAction.RequireWatchDisconnectionOnly:
return eventManagerImpl.onLastRemoteStoreUnlisten!(query);
default:
return;
}
}

Expand Down Expand Up @@ -241,6 +341,14 @@ function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void {
});
}

export enum ListenerDataSource {
/** Listen to both cache and server changes */
Default = 'default',

/** Listen to changes in cache only */
Cache = 'cache'
}

export interface ListenOptions {
/** Raise events even when only the metadata changes */
readonly includeMetadataChanges?: boolean;
Expand All @@ -250,6 +358,9 @@ export interface ListenOptions {
* offline.
*/
readonly waitForSyncWhenOnline?: boolean;

/** Set the source events raised from. */
readonly source?: ListenerDataSource;
}

/**
Expand Down Expand Up @@ -359,6 +470,11 @@ export class QueryListener {
return true;
}

// Always raise event if listening to cache
if (!this.listensToRemoteStore()) {
return true;
}

// NOTE: We consider OnlineState.Unknown as online (it should become Offline
// or Online if we wait long enough).
const maybeOnline = onlineState !== OnlineState.Offline;
Expand Down Expand Up @@ -417,4 +533,8 @@ export class QueryListener {
this.raisedInitialEvent = true;
this.queryObserver.next(snap);
}

listensToRemoteStore(): boolean {
return this.options.source !== ListenerDataSource.Cache;
}
}
Loading

0 comments on commit ce88e71

Please sign in to comment.