From 4bef3eb02a21072d61796892480893e83be07717 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Mon, 25 Dec 2023 12:59:03 +0200 Subject: [PATCH 1/3] CSC POC --- packages/client/lib/RESP/types.ts | 6 + packages/client/lib/client/cache.ts | 501 +++++++++++++++++++ packages/client/lib/client/commands-queue.ts | 18 +- packages/client/lib/client/index.ts | 108 +++- packages/client/lib/client/linked-list.ts | 10 +- packages/client/lib/client/pool.ts | 78 ++- packages/client/lib/client/socket.ts | 7 + packages/client/lib/commands/GET.ts | 6 + 8 files changed, 701 insertions(+), 33 deletions(-) create mode 100644 packages/client/lib/client/cache.ts diff --git a/packages/client/lib/RESP/types.ts b/packages/client/lib/RESP/types.ts index 9f0e9217345..3869eede671 100644 --- a/packages/client/lib/RESP/types.ts +++ b/packages/client/lib/RESP/types.ts @@ -263,6 +263,11 @@ export type CommandArguments = Array & { preserve?: unknown }; // response?: ResponsePolicies | null; // }; +export interface CacheInfo { + cacheKey: string; + redisKeys: Array +} + export type Command = { FIRST_KEY_INDEX?: number | ((this: void, ...args: Array) => RedisArgument | undefined); IS_READ_ONLY?: boolean; @@ -275,6 +280,7 @@ export type Command = { transformArguments(this: void, ...args: Array): CommandArguments; TRANSFORM_LEGACY_REPLY?: boolean; transformReply: TransformReply | Record; + getCacheInfo?: (...args: Array) => CacheInfo | undefined; }; export type RedisCommands = Record; diff --git a/packages/client/lib/client/cache.ts b/packages/client/lib/client/cache.ts new file mode 100644 index 00000000000..838434abffa --- /dev/null +++ b/packages/client/lib/client/cache.ts @@ -0,0 +1,501 @@ +import RedisClient from '.'; +import { Command, ReplyUnion, TransformReply } from '../RESP/types'; + +type CachingClient = RedisClient; +type CmdFunc = () => Promise; + +export interface ClientSideCacheConfig { + ttl: number; + maxEntries: number; + lru: boolean; +} + +type CacheCreator = { + epoch: number; + client: CachingClient; +}; + +interface ClientSideCacheEntry { + invalidate(): void; + validate(): boolean; +} + +abstract class ClientSideCacheEntryBase implements ClientSideCacheEntry { + #invalidated = false; + readonly #ttl: number; + readonly #created: number; + + constructor(ttl: number) { + this.#ttl = ttl; + this.#created = Date.now(); + } + + invalidate(): void { + this.#invalidated = true; + } + + validate(): boolean { + return !this.#invalidated && (this.#ttl == 0 || (Date.now() - this.#created) < this.#ttl) + } +} + +export class ClientSideCacheEntryValue extends ClientSideCacheEntryBase { + readonly #value: any; + + get value() { + return this.#value; + } + + constructor(ttl: number, value: any) { + super(ttl); + this.#value = value; + } +} + +export class ClientSideCacheEntryPromise extends ClientSideCacheEntryBase { + readonly #sendCommandPromise: Promise; + + get promise() { + return this.#sendCommandPromise; + } + + constructor(ttl: number, sendCommandPromise: Promise) { + super(ttl); + this.#sendCommandPromise = sendCommandPromise; + } +} + +/* +The reason for using abstract class vs interface, is that interfaces aren't part of the type hierarchy, abstract classes are +Therefore, one can restrict union types with a `instanceof abstract class` that one can't do with union types. +This allows us to easily have a clientSideCache config option that takes a ClientSideCacheProvider object or a config statement and +easily distinguish them in code. +i.e. + clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; +and + if (clientSideCache) { + if (clientSideCache instance of ClientSideCacheProvider) { + ... + } else { + // it's a ClientSideCacheConfig object + } +*/ +export abstract class ClientSideCacheProvider { + abstract handleCache(client: CachingClient, cmd: Command, args: Array, fn: CmdFunc, transformReply: TransformReply, preserve?: any): Promise; + abstract trackingOn(): Array; + abstract invalidate(key: string): void; + abstract clear(): void; + abstract cacheHits(): number; + abstract cacheMisses(): number; + abstract clearOnReconnect(): boolean; +} + +export class BasicClientSideCache extends ClientSideCacheProvider { + #cacheKeyToEntryMap: Map; + #keytoCacheKeySetMap: Map>; + readonly ttl: number; + readonly #maxEntries: number; + readonly #lru: boolean; + #cacheHits = 0; + #cacheMisses = 0; + + constructor(ttl: number, maxEntries: number, lru: boolean) { + super(); + + this.#cacheKeyToEntryMap = new Map(); + this.#keytoCacheKeySetMap = new Map>(); + this.ttl = ttl; + this.#maxEntries = maxEntries; + this.#lru = lru; + } + + override clearOnReconnect(): boolean { + return true; + } + + /* logic of how caching works: + + 1. every command that's cachable provide a getCacheInfo(args) function + This function (if defined for cachable commands) returns a struct with 2 elements, + cacheKey - a unique key that corresponds to this command and its arguments + redisKeys - an array of redis keys as strings that if the key is modified, will cause redis to invalidate this result when cached + 2. check if cacheKey is in our cache + 2a. if it is, validate (in the case of a pooled cache without redirect) + 2b1. if valid + 2b1a. if its a value cacheEntry - return it + 2b1b. if it's a promise cache entry - wait on promise and then go to 3c. + 3. if cacheEntry is not in cache + 3a. send the command save the promise into a a cacheEntry and then wait on result + 3b. check the cacheEntry is still valid - in cache and hasn't been deleted) + 3c. if valid - overwrite with value entry + 4. return previously non cached result + */ + override async handleCache(client: CachingClient, cmd: Command, args: Array, fn: CmdFunc, transformReply: TransformReply, preserve?: any) { + let reply: ReplyUnion; + + // "1" - no caching if typemapping in use. + const cacheInfo = cmd.getCacheInfo?.(args); + const cacheKey = cacheInfo?.cacheKey; + + // "2" + let cacheEntry = this.get(cacheKey); + if (cacheEntry && !cacheEntry.validate()) { // "2a" + console.log("invalidating cache entry as old epoch"); + this.delete(cacheKey!); + cacheEntry = undefined; + } + + // "2b1" + if (cacheEntry) { + if (cacheEntry instanceof ClientSideCacheEntryValue) { // "2b1a" + console.log("returning value from cache"); + this.cacheHit(); + + return cacheEntry.value; + } else if (cacheEntry instanceof ClientSideCacheEntryPromise) { // 2b1b + // unsure if this should be considered a cache hit, a miss, or neither? + reply = await cacheEntry.promise; + } else { + throw new Error("unknown cache entry type"); + } + } else { // 3/3a + if (cacheInfo) { // something can't be a cache miss if it wasn't cacheable. + this.cacheMiss(); + } + const promise = fn(); + if (cacheKey) { + cacheEntry = this.createPromiseEntry(client, promise); + this.set(cacheKey, cacheEntry, cacheInfo.redisKeys); + } + try { + reply = await promise; + } catch (err) { + if (cacheKey) { // on error, have to remove promise from cache + this.delete(cacheKey); + } + throw err; + } + } + + const val = transformReply ? transformReply(reply, preserve) : reply; + + // 3b + if (cacheInfo) { + // cacheInfo being defnined implies cachable, which implies that cacheEntry has to exist + if (cacheEntry!.validate()) { // revalidating promise entry (dont save value, if promise entry has been invalidated) + // 3c + console.log("saving value to cache"); + cacheEntry = this.createValueEntry(client, val); + this.set(cacheInfo.cacheKey, cacheEntry, cacheInfo.redisKeys); + } else { + console.log("cache entry for key got invalidated between execution and saving, so not saving"); + } + } + + return val; + } + + override trackingOn() { + return ['CLIENT', 'TRACKING', 'ON']; + } + + override invalidate(key: string) { + console.log(`invalidate: ${key}`); + const set = this.#keytoCacheKeySetMap.get(key); + if (set) { + for (const cacheKey of set) { + console.log(`invalidate: got ${cacheKey} from key ${key} set`); + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + } + this.#cacheKeyToEntryMap.delete(cacheKey); + } + this.#keytoCacheKeySetMap.delete(key); + } + } + + override clear() { + this.#cacheKeyToEntryMap.clear(); + this.#keytoCacheKeySetMap.clear(); + this.#cacheHits = 0; + this.#cacheMisses = 0; + } + + get(cacheKey?: string | undefined) { + if (cacheKey === undefined) { + return undefined + } + + const val = this.#cacheKeyToEntryMap.get(cacheKey); + if (val !== undefined && this.#lru) { + this.#cacheKeyToEntryMap.delete(cacheKey); + this.#cacheKeyToEntryMap.set(cacheKey, val); + } + + return val; + } + + delete(cacheKey: string) { + const entry = this.#cacheKeyToEntryMap.get(cacheKey); + if (entry) { + entry.invalidate(); + this.#cacheKeyToEntryMap.delete(cacheKey); + } + } + + has(cacheKey: string) { + return this.#cacheKeyToEntryMap.has(cacheKey); + } + + set(cacheKey: string, cacheEntry: ClientSideCacheEntry, keys: Array) { + let count = this.#cacheKeyToEntryMap.size; + const oldEntry = this.#cacheKeyToEntryMap.get(cacheKey); + if (oldEntry) { + count--; // overwriting, so not incrementig + oldEntry.invalidate(); + } + + if (this.#maxEntries > 0 && count >= this.#maxEntries) { + this.deleteOldest(); + } + + this.#cacheKeyToEntryMap.set(cacheKey, cacheEntry); + + for (const key of keys) { + if (!this.#keytoCacheKeySetMap.has(key)) { + this.#keytoCacheKeySetMap.set(key, new Set()); + } + + const cacheKeySet = this.#keytoCacheKeySetMap.get(key); + cacheKeySet!.add(cacheKey); + } + } + + size() { + return this.#cacheKeyToEntryMap.size; + } + + createValueEntry(client: RedisClient, value: any): ClientSideCacheEntryValue { + return new ClientSideCacheEntryValue(this.ttl, value); + } + + createPromiseEntry(client: RedisClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + return new ClientSideCacheEntryPromise(this.ttl, sendCommandPromise); + } + + cacheHit(): void { + this.#cacheHits++; + } + + cacheMiss(): void { + this.#cacheMisses++; + } + + override cacheHits(): number { + return this.#cacheHits; + } + + override cacheMisses(): number { + return this.#cacheMisses; + } + + /** + * @internal + */ + deleteOldest() { + const it = this.#cacheKeyToEntryMap[Symbol.iterator](); + const n = it.next(); + if (!n.done) { + this.#cacheKeyToEntryMap.delete(n.value[0]); + } + } + + /** + * @internal + */ + entryEntries() { + return this.#cacheKeyToEntryMap.entries(); + } + + /** + * @internal + */ + keySetEntries() { + return this.#keytoCacheKeySetMap.entries(); + } +} + +export abstract class PooledClientSideCacheProvider extends BasicClientSideCache { + abstract updateRedirect(id: number): void; + abstract addClient(client: RedisClient): void; + abstract removeClient(client: RedisClient): void; +} + +// doesn't do anything special in pooling, clears cache on every client disconnect +export class BasicPooledClientSideCache extends PooledClientSideCacheProvider { + override updateRedirect(id: number): void { + return; + } + + override addClient(client: RedisClient): void { + return; + } + override removeClient(client: RedisClient): void { + return; + } +} + +class PooledClientSideCacheEntryValue extends ClientSideCacheEntryValue { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, value: any) { + super(ttl, value); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +class PooledClientSideCacheEntryPromise extends ClientSideCacheEntryPromise { + #creator: CacheCreator; + + constructor(ttl: number, creator: CacheCreator, sendCommandPromise: Promise) { + super(ttl, sendCommandPromise); + + this.#creator = creator; + } + + override validate(): boolean { + let ret = super.validate(); + if (this.#creator) { + ret = ret && this.#creator.client.isReady && this.#creator.client.socketEpoch == this.#creator.epoch + } + + return ret; + } +} + +// Doesn't clear cache on client disconnect, validates entries on retrieval +export class PooledNoRedirectClientSideCache extends BasicPooledClientSideCache { + override createValueEntry(client: RedisClient, value: any): ClientSideCacheEntryValue { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryValue(this.ttl, creator, value); + } + + override createPromiseEntry(client: RedisClient, sendCommandPromise: Promise): ClientSideCacheEntryPromise { + const creator = { + epoch: client.socketEpoch, + client: client + }; + + return new PooledClientSideCacheEntryPromise(this.ttl, creator, sendCommandPromise); + } + + override clearOnReconnect(): boolean { + return false; + } +} + +// Only clears cache on "management"/"redirect" client disconnect +export class PooledRedirectClientSideCache extends PooledClientSideCacheProvider { + #id?: number; + #clients: Set = new Set(); + #disabled = true; + #redirectClient?: CachingClient; + + constructor(ttl: number, maxEntries: number, lru: boolean) { + super(ttl, maxEntries, lru); + } + + disable() { + this.#disabled = true; + } + + enable() { + this.#disabled = false; + } + + override get(cacheKey: string) { + if (this.#disabled) { + return undefined; + } + + return super.get(cacheKey); + } + + override has(cacheKey: string) { + if (this.#disabled) { + return false; + } + + return super.has(cacheKey); + } + + override trackingOn(): string[] { + if (this.#id) { + return ['CLIENT', 'TRACKING', 'ON', 'REDIRECT', this.#id.toString()]; + } else { + return []; + } + } + + override updateRedirect(id: number) { + this.#id = id; + for (const client of this.#clients) { + client.sendCommand(this.trackingOn()).catch(() => {}); + } + } + + override addClient(client: RedisClient) { + this.#clients.add(client); + } + + override removeClient(client: RedisClient) { + this.#clients.delete(client); + } + + override clearOnReconnect(): boolean { + return false; + } + + setRedirectClient(client: CachingClient) { + this.#redirectClient = client; + } + + destroy() { + this.clear(); + + if (this.#redirectClient) { + this.#id = undefined; + const client = this.#redirectClient; + this.#redirectClient = undefined; + + client.destroy(); + } + } + + async close() { + this.clear(); + + if (this.#redirectClient) { + this.#id = undefined; + const client = this.#redirectClient; + this.#redirectClient = undefined; + + return client.close(); + } + } +} \ No newline at end of file diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 034c1e46bd8..3d04d824022 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -56,6 +56,8 @@ export default class RedisCommandsQueue { return this.#pubSub.isActive; } + #invalidateCallback?: (key: string) => unknown; + constructor( respVersion: RespVersions, maxLength: number | null | undefined, @@ -109,13 +111,27 @@ export default class RedisCommandsQueue { onErrorReply: err => this.#onErrorReply(err), onPush: push => { if (!this.#onPush(push)) { - + switch (push[0].toString()) { + case "invalidate": { + console.log("invalidate push message"); + if (this.#invalidateCallback) { + for (const key of push[1]) { + this.#invalidateCallback(key.toString()); + } + } + break; + } + } } }, getTypeMapping: () => this.#getTypeMapping() }); } + setInvalidateCallback(callback?: (key: string) => unknown) { + this.#invalidateCallback = callback; + } + addCommand( args: CommandArguments, options?: CommandOptions diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index d6b9e3714d4..a9a2240ae25 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -14,6 +14,7 @@ import HELLO, { HelloOptions } from '../commands/HELLO'; import { ScanOptions, ScanCommonOptions } from '../commands/SCAN'; import { RedisLegacyClient, RedisLegacyClientType } from './legacy-mode'; import { RedisPoolOptions, RedisClientPool } from './pool'; +import { BasicClientSideCache, ClientSideCacheConfig, ClientSideCacheProvider } from './cache'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -69,6 +70,10 @@ export interface RedisClientOptions< * TODO */ commandOptions?: CommandOptions; + /** + * TODO + */ + clientSideCache?: ClientSideCacheProvider | ClientSideCacheConfig; } type WithCommands< @@ -150,22 +155,39 @@ export default class RedisClient< static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyClient, ...args: Array) { - const redisArgs = command.transformArguments(...args), - reply = await this.sendCommand(redisArgs, this._commandOptions); - return transformReply ? - transformReply(reply, redisArgs.preserve) : - reply; - }; + const redisArgs = command.transformArguments(...args); + const fn = () => { return this.sendCommand(redisArgs, this._commandOptions); }; + + // I think this would require fundamental changes, type mapping occurs in the sendCommand (before reply processing), + // to support typemapping, we would have to cache the "resp" response, pre type mapping, and then every time, typemap + // it if neccessary, and then transform it. + const defaultTypeMapping = this._self.#options?.commandOptions === this._commandOptions + + if (!defaultTypeMapping || !this._self.#clientSideCache) { + const reply = await fn(); + return transformReply ? transformReply(reply, redisArgs.preserve) : reply; + } else { + const csc = this._self.#clientSideCache; + return await csc.handleCache(this._self, command, args, fn, transformReply, redisArgs.preserve); + } + } } static #createModuleCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: NamespaceProxyClient, ...args: Array) { - const redisArgs = command.transformArguments(...args), - reply = await this._self.sendCommand(redisArgs, this._self._commandOptions); - return transformReply ? - transformReply(reply, redisArgs.preserve) : - reply; + const redisArgs = command.transformArguments(...args); + const fn = () => { return this._self.sendCommand(redisArgs, this._self._commandOptions); }; + + const defaultTypeMapping = this._self._self.#options?.commandOptions === this._self._commandOptions + + if (!defaultTypeMapping || !this._self._self.#clientSideCache) { + const reply = await fn(); + return transformReply ? transformReply(reply, redisArgs.preserve) : reply; + } else { + const csc = this._self._self.#clientSideCache; + return await csc.handleCache(this._self._self, command, args, fn, transformReply, redisArgs.preserve); + } }; } @@ -173,14 +195,22 @@ export default class RedisClient< const prefix = functionArgumentsPrefix(name, fn), transformReply = getTransformReply(fn, resp); return async function (this: NamespaceProxyClient, ...args: Array) { - const fnArgs = fn.transformArguments(...args), - reply = await this._self.sendCommand( + const fnArgs = fn.transformArguments(...args); + const newFn = () => { return this._self.sendCommand( prefix.concat(fnArgs), this._self._commandOptions ); - return transformReply ? - transformReply(reply, fnArgs.preserve) : - reply; + }; + + const defaultTypeMapping = this._self._self.#options?.commandOptions === this._self._commandOptions + + if (!defaultTypeMapping || !this._self._self.#clientSideCache) { + const reply = await newFn(); + return transformReply ? transformReply(reply, fnArgs.preserve) : reply; + } else { + const csc = this._self._self.#clientSideCache; + return await csc.handleCache(this._self._self, fn, args, newFn, transformReply, fnArgs.preserve); + } }; } @@ -189,11 +219,18 @@ export default class RedisClient< transformReply = getTransformReply(script, resp); return async function (this: ProxyClient, ...args: Array) { const scriptArgs = script.transformArguments(...args), - redisArgs = prefix.concat(scriptArgs), - reply = await this.executeScript(script, redisArgs, this._commandOptions); - return transformReply ? - transformReply(reply, scriptArgs.preserve) : - reply; + redisArgs = prefix.concat(scriptArgs); + const fn = () => { return this.executeScript(script, redisArgs, this._commandOptions); }; + + const defaultTypeMapping = this._self._self.#options?.commandOptions === this._self._commandOptions + + if (!defaultTypeMapping || !this._self.#clientSideCache) { + const reply = await fn(); + return transformReply ? transformReply(reply, scriptArgs.preserve) : reply; + } else { + const csc = this._self.#clientSideCache; + return await csc.handleCache(this._self, script, args, fn, transformReply, scriptArgs.preserve); + } }; } @@ -279,6 +316,9 @@ export default class RedisClient< #monitorCallback?: MonitorCallback; private _self = this; private _commandOptions?: CommandOptions; + + #clientSideCache?: ClientSideCacheProvider; + #clearOnReconnect = false; get options(): RedisClientOptions | undefined { return this._self.#options; @@ -296,11 +336,26 @@ export default class RedisClient< return this._self.#queue.isPubSubActive; } + get socketEpoch() { + return this._self.#socket.socketEpoch; + } + constructor(options?: RedisClientOptions) { super(); + this.#options = this.#initiateOptions(options); this.#queue = this.#initiateQueue(); this.#socket = this.#initiateSocket(); + + if (options?.clientSideCache) { + if (options.clientSideCache instanceof ClientSideCacheProvider) { + this.#clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = new BasicClientSideCache(cscConfig.ttl, cscConfig.maxEntries, cscConfig.lru); + } + this.#clearOnReconnect = this.#clientSideCache.clearOnReconnect(); + } } #initiateOptions(options?: RedisClientOptions): RedisClientOptions | undefined { @@ -334,7 +389,6 @@ export default class RedisClient< #handshake(selectedDB: number) { const commands = []; - if (this.#options?.RESP) { const hello: HelloOptions = {}; @@ -379,6 +433,13 @@ export default class RedisClient< ); } + if (this.#clientSideCache) { + const tracking = this.#clientSideCache.trackingOn(); + if (tracking.length > 0) { + commands.push(tracking); + } + } + return commands; } @@ -432,6 +493,9 @@ export default class RedisClient< }) .on('error', err => { this.emit('error', err); + if (this.#clearOnReconnect) { + this.#clientSideCache!.clear(); + } if (this.#socket.isOpen && !this.#options?.disableOfflineQueue) { this.#queue.flushWaitingForReply(err); } else { diff --git a/packages/client/lib/client/linked-list.ts b/packages/client/lib/client/linked-list.ts index ac1d021be91..29678f027b5 100644 --- a/packages/client/lib/client/linked-list.ts +++ b/packages/client/lib/client/linked-list.ts @@ -114,6 +114,7 @@ export class DoublyLinkedList { export interface SinglyLinkedNode { value: T; next: SinglyLinkedNode | undefined; + removed: boolean; } export class SinglyLinkedList { @@ -140,7 +141,8 @@ export class SinglyLinkedList { const node = { value, - next: undefined + next: undefined, + removed: false }; if (this.#head === undefined) { @@ -151,6 +153,9 @@ export class SinglyLinkedList { } remove(node: SinglyLinkedNode, parent: SinglyLinkedNode | undefined) { + if (node.removed) { + throw new Error("node already removed"); + } --this.#length; if (this.#head === node) { @@ -165,6 +170,8 @@ export class SinglyLinkedList { } else { parent!.next = node.next; } + + node.removed = true; } shift() { @@ -177,6 +184,7 @@ export class SinglyLinkedList { this.#head = node.next; } + node.removed = true; return node.value; } diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index fc996e07625..27cb3fa095e 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -7,6 +7,7 @@ import { TimeoutError } from '../errors'; import { attachConfig, functionArgumentsPrefix, getTransformReply, scriptArgumentsPrefix } from '../commander'; import { CommandOptions } from './commands-queue'; import RedisClientMultiCommand, { RedisClientMultiCommandType } from './multi-command'; +import { BasicPooledClientSideCache, ClientSideCacheConfig, PooledClientSideCacheProvider, PooledNoRedirectClientSideCache, PooledRedirectClientSideCache } from './cache'; export interface RedisPoolOptions { /** @@ -25,6 +26,10 @@ export interface RedisPoolOptions { * TODO */ cleanupDelay: number; + /** + * TODO + */ + clientSideCache?: PooledClientSideCacheProvider | ClientSideCacheConfig; } export type PoolTask< @@ -58,12 +63,13 @@ export class RedisClientPool< RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} > extends EventEmitter { + // TODO: for CSC will have to be modified, to directly call function on underlying chosen client static #createCommand(command: Command, resp: RespVersions) { const transformReply = getTransformReply(command, resp); return async function (this: ProxyPool, ...args: Array) { const redisArgs = command.transformArguments(...args), reply = await this.sendCommand(redisArgs, this._commandOptions); - return transformReply ? + return transformReply ? transformReply(reply, redisArgs.preserve) : reply; }; @@ -133,7 +139,7 @@ export class RedisClientPool< // returning a "proxy" to prevent the namespaces._self to leak between "proxies" return Object.create( new Pool( - RedisClient.factory(clientOptions).bind(undefined, clientOptions), + clientOptions, options ) ) as RedisClientPoolType; @@ -207,22 +213,39 @@ export class RedisClientPool< return this._self.#isClosing; } + #clientSideCache?: PooledClientSideCacheProvider; + /** * You are probably looking for {@link RedisClient.createPool `RedisClient.createPool`}, * {@link RedisClientPool.fromClient `RedisClientPool.fromClient`}, * or {@link RedisClientPool.fromOptions `RedisClientPool.fromOptions`}... */ constructor( - clientFactory: () => RedisClientType, + clientOptions?: RedisClientOptions, options?: Partial ) { super(); - this.#clientFactory = clientFactory; this.#options = { ...RedisClientPool.#DEFAULTS, ...options }; + if (options?.clientSideCache) { + if (clientOptions === undefined) { + clientOptions = {}; + } + + if (options.clientSideCache instanceof PooledClientSideCacheProvider) { + this.#clientSideCache = clientOptions.clientSideCache = options.clientSideCache; + } else { + const cscConfig = options.clientSideCache; + this.#clientSideCache = clientOptions.clientSideCache = new BasicPooledClientSideCache(cscConfig.ttl, cscConfig.maxEntries, cscConfig.lru); + this.#clientSideCache = clientOptions.clientSideCache = new PooledNoRedirectClientSideCache(cscConfig.ttl, cscConfig.maxEntries, cscConfig.lru); + this.#clientSideCache = clientOptions.clientSideCache = new PooledRedirectClientSideCache(cscConfig.ttl, cscConfig.maxEntries, cscConfig.lru); + } + } + + this.#clientFactory = RedisClient.factory(clientOptions).bind(undefined, clientOptions) as () => RedisClientType; } private _self = this; @@ -286,9 +309,29 @@ export class RedisClientPool< async connect() { if (this._self.#isOpen) return; // TODO: throw error? - this._self.#isOpen = true; + if (this._self.#clientSideCache instanceof PooledRedirectClientSideCache) { + const client = this._self.#clientFactory(); + const cache = this._self.#clientSideCache; + cache.setRedirectClient(client); + client.on("error", () => { + cache.disable(); + cache.clear(); + }).on("ready", async () => { + const clientId = await client.clientId() as number; + cache.updateRedirect(clientId); + cache.enable(); + }) + + try { + await client.connect(); + } catch (err) { + this.destroy(); + throw err; + } + } + const promises = []; while (promises.length < this._self.#options.minimum) { promises.push(this._self.#create()); @@ -296,21 +339,27 @@ export class RedisClientPool< try { await Promise.all(promises); - return this as unknown as RedisClientPoolType; } catch (err) { this.destroy(); throw err; } + + return this as unknown as RedisClientPoolType; } - async #create() { + async #create(redirect?: boolean) { const node = this._self.#clientsInUse.push( this._self.#clientFactory() .on('error', (err: Error) => this.emit('error', err)) ); try { - await node.value.connect(); + const client = node.value; + if (this._self.#clientSideCache) { + this._self.#clientSideCache.addClient(node.value); + } + + await client.connect(); } catch (err) { this._self.#clientsInUse.remove(node); throw err; @@ -399,7 +448,9 @@ export class RedisClientPool< const toDestroy = Math.min(this.#idleClients.length, this.totalClients - this.#options.minimum); for (let i = 0; i < toDestroy; i++) { // TODO: shift vs pop - this.#idleClients.shift()!.destroy(); + const client = this.#idleClients.shift()! + this.#clientSideCache?.removeClient(client); + client.destroy(); } } @@ -444,6 +495,10 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { promises.push(client.close()); } + + if (this.#clientSideCache instanceof PooledRedirectClientSideCache) { + promises.push(this.#clientSideCache.close()); + } await Promise.all(promises); @@ -465,6 +520,11 @@ export class RedisClientPool< for (const client of this._self.#clientsInUse) { client.destroy(); } + + if (this.#clientSideCache instanceof PooledRedirectClientSideCache) { + this.#clientSideCache.destroy(); + } + this._self.#clientsInUse.reset(); this._self.#isOpen = false; diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index 753ced6c5ed..a6837e00619 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -84,6 +84,12 @@ export default class RedisSocket extends EventEmitter { #isSocketUnrefed = false; + #socketEpoch = 0; + + get socketEpoch() { + return this.#socketEpoch; + } + constructor(initiator: RedisSocketInitiator, options?: RedisSocketOptions) { super(); @@ -151,6 +157,7 @@ export default class RedisSocket extends EventEmitter { throw err; } this.#isReady = true; + this.#socketEpoch++; this.emit('ready'); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); diff --git a/packages/client/lib/commands/GET.ts b/packages/client/lib/commands/GET.ts index bb3db4f76d9..9f0979cd645 100644 --- a/packages/client/lib/commands/GET.ts +++ b/packages/client/lib/commands/GET.ts @@ -6,5 +6,11 @@ export default { transformArguments(key: RedisArgument) { return ['GET', key]; }, + getCacheInfo(key: RedisArgument) { + return { + cacheKey: `get_${key.toString()}`, + redisKeys: [key.toString()] + } + }, transformReply: undefined as unknown as () => BlobStringReply | NullReply } as const satisfies Command; From b5710ce1475178129b761746a350d7b09f8335d0 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Fri, 2 Feb 2024 00:13:45 +0200 Subject: [PATCH 2/3] readme --- packages/client/lib/client/README-cache.md | 64 ++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 packages/client/lib/client/README-cache.md diff --git a/packages/client/lib/client/README-cache.md b/packages/client/lib/client/README-cache.md new file mode 100644 index 00000000000..01a43cf4594 --- /dev/null +++ b/packages/client/lib/client/README-cache.md @@ -0,0 +1,64 @@ +# Client Side Caching Support + +Client Side Caching enables Redis Servers and Clients to work together to enable a client to cache results from command sent to a server and be informed by the server when the cached result is no longer valid. + +## Usage + +node-redis supports two ways of instantiating client side caching support + +Note: Client Side Caching is only supported with RESP3. + +### Anonymous Cache + +```javascript +const client = createClient({RESP: 3, clientSideCache: {ttl: 0, maxEntries: 0, lru: false}}) +``` + +In this instance, the cache is opaque to the user, and they have no control over it. + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicClientSideCache(ttl, maxEntries, lru); +const client = createClient({RESP: 3, clientSideCache: cache}); +``` + +In this instance, the user has full control over the cache, as they have access to the cache object. + +They can manually invalidate keys + +```javascript +cache.invalidate(key); +``` + +they can clear the entire cache +g +```javascript +cache.clear(); +``` + +as well as get cache metrics + +```typescript +const hits: number = cache.cacheHits(); +const misses: number = cache.cacheMisses(); +``` + +## Pooled Caching + +Similar to individual clients, node-redis also supports caching for its pooled client object, with the cache being able to be instantiated in an anonymous manner or a controllable manner. + +### Anonymous Cache + +```javascript +const client = createClientPool({RESP: 3}, {clientSideCache: {ttl: 0, maxEntries: 0, lru: false}, minimum: 8}); +``` + +### Controllable Cache + +```javascript +const ttl = 0, maxEntries = 0, lru = false; +const cache = new BasicPooledClientSideCache(ttl, maxEntries, lru); +const client = createClientPool({RESP: 3}, {clientSideCache: cache, minimum: 8}); +``` \ No newline at end of file From 43363744e862ac9bb50af147cecf6636aab28e22 Mon Sep 17 00:00:00 2001 From: Shaya Potter Date: Tue, 30 Jan 2024 19:27:44 +0200 Subject: [PATCH 3/3] just a hack for testing pass through --- packages/client/lib/RESP/types.ts | 1 + packages/client/lib/client/pool.ts | 10 ++++++++-- packages/client/lib/commands/GET.ts | 1 + 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/RESP/types.ts b/packages/client/lib/RESP/types.ts index 3869eede671..a59e9618924 100644 --- a/packages/client/lib/RESP/types.ts +++ b/packages/client/lib/RESP/types.ts @@ -269,6 +269,7 @@ export interface CacheInfo { } export type Command = { + name?: () => string; FIRST_KEY_INDEX?: number | ((this: void, ...args: Array) => RedisArgument | undefined); IS_READ_ONLY?: boolean; /** diff --git a/packages/client/lib/client/pool.ts b/packages/client/lib/client/pool.ts index 27cb3fa095e..a85070d0e4c 100644 --- a/packages/client/lib/client/pool.ts +++ b/packages/client/lib/client/pool.ts @@ -63,13 +63,19 @@ export class RedisClientPool< RESP extends RespVersions = 2, TYPE_MAPPING extends TypeMapping = {} > extends EventEmitter { - // TODO: for CSC will have to be modified, to directly call function on underlying chosen client static #createCommand(command: Command, resp: RespVersions) { + // TODO: for CSC will have to be modified, to directly call function on underlying chosen client + // this is .name() is just a hack for now. + const name = command.name?.(); const transformReply = getTransformReply(command, resp); return async function (this: ProxyPool, ...args: Array) { + if (name) { + return this.execute((client => { return client[name](...args)})) + } + const redisArgs = command.transformArguments(...args), reply = await this.sendCommand(redisArgs, this._commandOptions); - return transformReply ? + return transformReply ? transformReply(reply, redisArgs.preserve) : reply; }; diff --git a/packages/client/lib/commands/GET.ts b/packages/client/lib/commands/GET.ts index 9f0979cd645..c2cf5e49127 100644 --- a/packages/client/lib/commands/GET.ts +++ b/packages/client/lib/commands/GET.ts @@ -1,6 +1,7 @@ import { RedisArgument, BlobStringReply, NullReply, Command } from '../RESP/types'; export default { + name: () => { return "GET" }, FIRST_KEY_INDEX: 1, IS_READ_ONLY: true, transformArguments(key: RedisArgument) {