From 36623ad3f935d35df851ac4a2ff31796c6dab0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Tue, 15 Aug 2023 18:08:35 +0200 Subject: [PATCH] fix: time resolution --- src/streaming-feed.ts | 201 ------------------------ src/streaming-feed/index.ts | 2 + src/utils.ts | 2 +- test/integration/streaming-feed.spec.ts | 18 +-- 4 files changed, 12 insertions(+), 211 deletions(-) delete mode 100644 src/streaming-feed.ts diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts deleted file mode 100644 index 3798ff7..0000000 --- a/src/streaming-feed.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' -import { makeTopic } from './feed' -import { getCurrentTime, getIndexForArbitraryTime } from './getIndexForArbitraryTime' -import { - assembleSocPayload, - mapSocToFeed, - StreamingFeedChunk, - IStreamingFeed, - SwarmStreamingFeedR, - SwarmStreamingFeedRW, - FaultTolerantStreamType, -} from './streaming' -import { ChunkReference, makeSigner, writeUint64BigEndian } from './utils' - -const { Hex } = Utils -const { hexToBytes } = Hex - -export class StreamingFeed implements IStreamingFeed { - public constructor(public readonly bee: Bee, public type: FaultTolerantStreamType = 'fault-tolerant-stream') {} - - /** - * Creates a streaming feed reader - * @param topic a swarm topic - * @param owner owner - * @returns a streaming feed reader - */ - public makeFeedR( - topic: Topic | Uint8Array | string, - owner: Utils.Eth.EthAddress | Uint8Array | string, - ): SwarmStreamingFeedR { - const socReader = this.bee.makeSOCReader(owner) - const topicHex = makeTopic(topic) - const topicBytes = hexToBytes<32>(topicHex) - const ownerHex = Utils.Eth.makeHexEthAddress(owner) - - /** - * Gets the last index in the feed - * @returns An index number - */ - const getLastIndex = async (initialTime: number, updatePeriod: number): Promise => { - const lookupTime = getCurrentTime() - - return getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - } - - /** - * Gets the last appended chunk in the feed - * @returns A feed chunk - */ - const findLastUpdate = async (initialTime: number, updatePeriod: number): Promise => { - return getUpdate(initialTime, updatePeriod) - } - - /** - * Download Feed Chunk at Specific Time - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns a StreamingFeedChunk object - */ - const getUpdate = async ( - initialTime: number, - updatePeriod: number, - lookupTime?: number, - ): Promise => { - lookupTime = lookupTime ?? getCurrentTime() - const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) - - return mapSocToFeed(socChunk) - } - - /** - * Download all feed chunks - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @returns a StreamingFeedChunk array object - */ - const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { - const feeds: StreamingFeedChunk[] = [] - - try { - let index = getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) - - index-- - - let lookupTime = getCurrentTime() - let feed - while (index > -1) { - // throws - const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) - feed = mapSocToFeed(socChunk) - lookupTime -= feed.updatePeriod - - feeds.push(feed) - index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - index-- - } - - return feeds - } catch (e) { - return feeds - } - } - - return { - type: 'fault-tolerant-stream', - owner: ownerHex, - topic: topicHex, - getIndexForArbitraryTime, - getUpdate, - getUpdates, - findLastUpdate, - getLastIndex, - } - } - - /** - * Creates a streaming feed reader / writer - * @param topic a swarm topic - * @param signer signer - * @returns a streaming feed reader / writer - */ - public makeFeedRW(topic: string | Topic | Uint8Array, signer: string | Uint8Array | Signer): SwarmStreamingFeedRW { - const canonicalSigner = makeSigner(signer) - const topicHex = makeTopic(topic) - const topicBytes = hexToBytes<32>(topicHex) - const feedR = this.makeFeedR(topic, canonicalSigner.address) - const socWriter = this.bee.makeSOCWriter(canonicalSigner) - - /** - * Sets the upload chunk to update - * @param index the chunk index to update - * @param postageBatchId swarm postage batch id - * @param reference chunk reference - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns a chunk reference - */ - const setUpdate = async ( - index: number, - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - ): Promise => { - const identifier = this.getIdentifier(topicBytes, index) - - return socWriter.upload( - postageBatchId, - identifier, - assembleSocPayload(hexToBytes(reference) as ChunkReference, { - at: initialTime, - updatePeriod, - index, - }), - ) - } - - /** - * Sets the next upload chunk - * @param postageBatchId swarm postage batch id - * @param reference chunk reference - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns a chunk reference - */ - const setLastUpdate = async ( - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime: number, - ): Promise => { - lookupTime = lookupTime ?? getCurrentTime() - const lastIndex = feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - - return setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) - } - - return { - ...feedR, - setLastUpdate, - setUpdate, - } - } - - /** - * Get Single Owner Chunk identifier - * @param topic a swarm topic, bytes 32 length - * @param index the chunk index - * @returns a bytes 32 - */ - public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { - const indexBytes = writeUint64BigEndian(index) - - return Utils.keccak256Hash(topic, indexBytes) - } -} diff --git a/src/streaming-feed/index.ts b/src/streaming-feed/index.ts index 209a64b..db59cba 100644 --- a/src/streaming-feed/index.ts +++ b/src/streaming-feed/index.ts @@ -56,12 +56,14 @@ export class StreamingFeed implements IStreamingFeed { * @param initialTime initial time of streaming feed * @param updatePeriod streaming feed frequency in milliseconds * @param lookupTime lookup time + * @param discover boolean, indicates whether the algorithm will look for the closest successful hit * @returns a StreamingFeedChunk object */ const getUpdate = async ( initialTime: number, updatePeriod: number, lookupTime?: number, + discover = true, ): Promise => { lookupTime = lookupTime ?? getCurrentTime() const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) diff --git a/src/utils.ts b/src/utils.ts index 7a74e0f..27282d6 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -194,4 +194,4 @@ export function isObject(value: unknown): value is Record { return value !== null && typeof value === 'object' } -export const getCurrentTime = (d = new Date()): number => d.getTime() +export const getCurrentTime = (d = new Date()): number => Math.floor(d.getTime() / 1000) // ms to s diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index ee13cd2..35a12f8 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -30,7 +30,7 @@ describe('streaming feed', () => { const feedRw = streamingFeed.makeFeedRW(topic, signer) const initialTime = getCurrentTime() - const updatePeriod = 5000 + const updatePeriod = 5 const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) @@ -45,7 +45,7 @@ describe('streaming feed', () => { const feedRw = streamingFeed.makeFeedRW(topic, signer) const initialTime = getCurrentTime() - const updatePeriod = 5000 + const updatePeriod = 5 const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) @@ -71,7 +71,7 @@ describe('streaming feed', () => { const random = new Date().getTime().toString().padStart(64, '0') const multipleUpdateTopic = Utils.Hex.makeHexString(random) as Topic - const updatePeriod = 5000 + const updatePeriod = 5 const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) const numUpdates = 5 @@ -93,11 +93,11 @@ describe('streaming feed', () => { } const feedUpdateResponse = await feedRw.getUpdates(initialTime, updatePeriod) - expect(feedUpdateResponse.length).toEqual(5) - expect(feedUpdateResponse[0].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[1].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[2].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[3].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[4].updatePeriod).toEqual(5000) + expect(feedUpdateResponse.length).toEqual(numUpdates) + expect(feedUpdateResponse[0].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[1].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[2].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[3].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[4].updatePeriod).toEqual(updatePeriod) }, 45000) })