From 9c45d4ad68dba8396f2cde79937c43eb63dbc1a6 Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Tue, 5 Jul 2022 11:52:57 -0500 Subject: [PATCH 01/21] feat(feed): streaming feed --- jest.config.ts | 2 +- src/feed.ts | 4 +- src/streaming-feed.ts | 149 +++++++++++++++++++++++ src/streaming.ts | 139 +++++++++++++++++++++ test/integration/sequential-feed.spec.ts | 6 +- test/integration/streaming-feed.spec.ts | 79 ++++++++++++ 6 files changed, 374 insertions(+), 5 deletions(-) create mode 100644 src/streaming-feed.ts create mode 100644 src/streaming.ts create mode 100644 test/integration/streaming-feed.spec.ts diff --git a/jest.config.ts b/jest.config.ts index ff906be..4277bd0 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -11,7 +11,7 @@ export default async (): Promise => { // This will setup the prerequisites for the tests to run globalSetup: './tests-setup.ts', - + testTimeout: 360000, // The directory where Jest should output its coverage files coverageDirectory: 'coverage', diff --git a/src/feed.ts b/src/feed.ts index ede155d..997d926 100644 --- a/src/feed.ts +++ b/src/feed.ts @@ -14,7 +14,7 @@ import { writeUint64BigEndian, } from './utils' -export const FEED_TYPES = ['sequential', 'fault-tolarent-stream'] as const +export const FEED_TYPES = ['sequential', 'fault-tolerant-stream'] as const export type FeedData = { timestamp: number @@ -26,7 +26,7 @@ export type FeedType = typeof FEED_TYPES[number] export type FeedIndex = T extends 'sequential' ? number - : T extends 'fault-tolarent-stream' + : T extends 'fault-tolerant-stream' ? number : never diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts new file mode 100644 index 0000000..2fd8f8f --- /dev/null +++ b/src/streaming-feed.ts @@ -0,0 +1,149 @@ +import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' +import { FeedType } from './feed' +import { + assembleSocPayload, + makeTopic, + mapSocToFeed, + StreamingFeedChunk, + SwarmStreamingFeed, + SwarmStreamingFeedR, + SwarmStreamingFeedRW, +} from './streaming' +import { ChunkReference, makeSigner, writeUint64BigEndian } from './utils' + +const { Hex } = Utils +const { hexToBytes } = Hex +export const getCurrentTime = (d = new Date()) => d.getTime() +export class StreamingFeed implements SwarmStreamingFeed { + public readonly type: FeedType + + public constructor(public readonly bee: Bee) { + this.type = 'fault-tolerant-stream' + } + + public makeFeedR( + topic: Topic | Uint8Array | string, + owner: Utils.Eth.EthAddress | Uint8Array | string, + ): SwarmStreamingFeedR { + const socReader = this.bee.makeSOCReader(owner) + const topicHex = makeTopic(topic) + const topicBytes = hexToBytes<32>(topicHex) + const ownerHex = Utils.Eth.makeHexEthAddress(owner) + + const getIndexForArbitraryTime = async ( + lookupTime: number, + initialTime: number, + updatePeriod: number, + ): Promise => { + const currentTime = getCurrentTime() // Tp + try { + // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp + if (currentTime >= initialTime && lookupTime >= initialTime) { + return Math.floor((lookupTime - initialTime) / updatePeriod) + } + } catch (e) { + // no-op + } + + return -1 + } + + // Download Feed Chunk at Specific Time + const getUpdate = async ( + initialTime: number, + updatePeriod: number, + lookupTime?: number, + ): Promise => { + lookupTime = lookupTime ?? getCurrentTime() + const index = await getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) + + return mapSocToFeed(socChunk) + } + + // Download Feed Stream + const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { + const feeds: StreamingFeedChunk[] = [] + // while from last to first, use lookupTime = chunk.timestamp + 1 + + const index = await getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + const socChunk = await socReader.download(this.getIdentifier(topicBytes, index - 1)) + + let feed = await mapSocToFeed(socChunk) + let i = 1 + feeds.push(feed) + while (i < index) { + feed = await getUpdate(initialTime, updatePeriod, feed.timestamp + updatePeriod + 1) + i++ + feeds.push(feed) + } + + return feeds.reverse() + } + + return { + type: 'fault-tolerant-stream', + owner: ownerHex, + topic: topicHex, + getIndexForArbitraryTime, + getUpdate, + getUpdates, + } + } + + public makeFeedRW( + topic: string | Topic | Uint8Array, + signer: string | Uint8Array | Signer, + ): SwarmStreamingFeedRW { + const canonicalSigner = makeSigner(signer) + const topicHex = makeTopic(topic) + const topicBytes = hexToBytes<32>(topicHex) + const feedR = this.makeFeedR(topic, canonicalSigner.address) + const socWriter = this.bee.makeSOCWriter(canonicalSigner) + + const _setUpdate = async ( + index: number, + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + ): Promise => { + const identifier = this.getIdentifier(topicBytes, index) + + return socWriter.upload( + postageBatchId, + identifier, + assembleSocPayload(hexToBytes(reference) as ChunkReference, { + at: initialTime, + updatePeriod, + index, + }), + ) + } + + const setLastUpdate = async ( + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + lookupTime: number, + ): Promise => { + lookupTime = lookupTime ?? getCurrentTime() + const lastIndex = await feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + + return _setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) + } + + return { + ...feedR, + setLastUpdate, + } + } + + /** Get Single Owner Chunk identifier */ + public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { + const indexBytes = writeUint64BigEndian(index) + + return Utils.keccak256Hash(topic, indexBytes) + } +} diff --git a/src/streaming.ts b/src/streaming.ts new file mode 100644 index 0000000..3fd90cc --- /dev/null +++ b/src/streaming.ts @@ -0,0 +1,139 @@ +import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' +import type { SingleOwnerChunk } from '@ethersphere/bee-js/dist/src/chunk/soc' +import type { ChunkReference } from '@ethersphere/bee-js/dist/src/feed' +import type { EthAddress } from '@ethersphere/bee-js/dist/src/utils/eth' +import { FeedType, SwarmFeedHandler } from './feed' +import { + assertBytes, + Bytes, + bytesToHex, + hexToBytes, + readUint64BigEndian, + serializeBytes, + TOPIC_BYTES_LENGTH, + TOPIC_HEX_LENGTH, + writeUint64BigEndian, +} from './utils' + +export type StreamingFeedData = { + timestamp: number + reference: ChunkReference + updatePeriod: number + chunkIndex: number +} + +export interface StreamingFeedChunk extends SingleOwnerChunk { + index: Index + reference: Reference + timestamp: number + updatePeriod: number +} + +/** Interface for feed type classes */ +export interface SwarmStreamingFeed { + /** Feed type identifier */ + readonly type: FeedType + /** initialised BeeJS instance */ + readonly bee: Bee + /** get Feed interface with read operations */ + makeFeedR( + topic: Topic | Uint8Array | string, + owner: EthAddress | Uint8Array | string, + ...options: any[] + ): SwarmStreamingFeedR + /** get Feed interface with write and read operations */ + makeFeedRW( + topic: Topic | Uint8Array | string, + signer: Signer | Uint8Array | string, + options?: any, + ): SwarmStreamingFeedRW + /** Get Single Owner Chunk identifier */ + getIdentifier(topic: Bytes<32>, index: Index): Bytes<32> +} + +/** Swarm Feed Read operations */ +export interface SwarmStreamingFeedR extends SwarmFeedHandler { + getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): Promise | Index + getUpdate(initialTime: number, updatePeriod: number, lookupTime?: Index): Promise> + getUpdates(initialTime: number, updatePeriod: number): Promise[]> +} + +/** Swarm Feed Read and operations */ +export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { + setLastUpdate( + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + lookupTime?: number, + ): Promise +} + +export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedData { + const index = readUint64BigEndian(payload.slice(0, 8) as Bytes<8>) + const updatePeriod = readUint64BigEndian(payload.slice(8, 16) as Bytes<8>) + const timestamp = readUint64BigEndian(payload.slice(16, 24) as Bytes<8>) + const p = payload.slice(24) // 32 bytes + + if (p.length === 32 || p.length === 64) { + return { + timestamp, + updatePeriod, + chunkIndex: index, + reference: p as ChunkReference, + } + } + + // TODO handle JSON-like metadata + throw new Error('NotImplemented: payload is longer than expected') +} + +export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { + const { reference, timestamp, updatePeriod, chunkIndex } = extractDataFromSocPayload(socChunk.payload()) + + return { + ...socChunk, + index: chunkIndex as unknown as Index, + timestamp, + updatePeriod, + reference: bytesToHex(reference), + } +} + +export function assembleSocPayload( + reference: ChunkReference, + options?: { at?: number; updatePeriod?: number; index?: number }, +): Uint8Array { + const at = options?.at ?? Date.now() / 1000.0 + const timestamp = writeUint64BigEndian(at) + const updatePeriod = writeUint64BigEndian(options?.updatePeriod ?? 0) + const chunkIndex = writeUint64BigEndian(options?.index ?? -1) + + return serializeBytes(chunkIndex, updatePeriod, timestamp, reference) +} + +/** Converts feedIndex response to integer */ +export function fetchIndexToInt(fetchIndex: string): number { + const indexBytes = hexToBytes(fetchIndex) + let index = 0 + for (let i = indexBytes.length - 1; i >= 0; i--) { + const byte = indexBytes[i] + + if (byte === 0) break + + index += byte + } + + return index +} + +export function makeTopic(topic: Uint8Array | string): Topic { + if (typeof topic === 'string') { + return Utils.Hex.makeHexString(topic, TOPIC_HEX_LENGTH) + } else if (topic instanceof Uint8Array) { + assertBytes<32>(topic, TOPIC_BYTES_LENGTH) + + return bytesToHex(topic, TOPIC_HEX_LENGTH) + } + throw new TypeError('invalid topic') +} diff --git a/test/integration/sequential-feed.spec.ts b/test/integration/sequential-feed.spec.ts index 5acc38e..97802bf 100644 --- a/test/integration/sequential-feed.spec.ts +++ b/test/integration/sequential-feed.spec.ts @@ -39,10 +39,12 @@ describe('feed', () => { }, 21000) test('multiple updates using setUpdate and lookup', async () => { - const reference = Utils.Hex.makeHexString('0000000000000000000000000000000000000000000000000000000000000000', 64) + const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) const referenceBytes = hexToBytes(reference) assertBytes(referenceBytes, 32) - const multipleUpdateTopic = '3000000000000000000000000000000000000000000000000000000000000000' as Topic + const random = new Date().getTime().toString().padStart(64, '0') + const multipleUpdateTopic = Utils.Hex.makeHexString(random) as Topic + const feedRw = sequentialFeed.makeFeedRW(multipleUpdateTopic, signer) const lastIndex = await feedRw.getLastIndex() const nextIndex = lastIndex === -1 ? 0 : lastIndex + 1 diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts new file mode 100644 index 0000000..acc533a --- /dev/null +++ b/test/integration/streaming-feed.spec.ts @@ -0,0 +1,79 @@ +import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' +import { getCurrentTime, StreamingFeed } from '../../src/streaming-feed' +import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' +import { beeUrl, getPostageBatch } from '../utils' +jest.setTimeout(360 * 1000) +describe('streaming feed', () => { + const testIdentity = { + privateKey: '634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd' as HexString, + publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, + address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, + } + const random = new Date().getTime().toString().padStart(64, '0') + const topic = Utils.Hex.makeHexString(random) as Topic + const owner = Utils.Hex.makeHexString(testIdentity.address, 40) + const signer = makePrivateKeySigner(hexToBytes(testIdentity.privateKey) as Bytes<32>) + const bee = new Bee(beeUrl()) + const batchId = getPostageBatch() + const streamingFeed = new StreamingFeed(bee) + + test('lookup for empty feed update', async () => { + const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic + const feedR = streamingFeed.makeFeedR(emptyTopic, testIdentity.address) + const lastIndex = await feedR.getIndexForArbitraryTime(getCurrentTime()) + + expect(lastIndex).toBe(-1) + }, 40000) + + test('setLastupdate then lookup', async () => { + const feedRw = streamingFeed.makeFeedRW(topic, signer) + + const initialTime = getCurrentTime() + const updatePeriod = 5000 + const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> + await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) + + const feedUpdate = await feedRw.getUpdate(initialTime, updatePeriod) + const lastIndex = await feedRw.getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + + expect(feedUpdate.index).toEqual(lastIndex) + expect(bytesToHex(feedUpdate.owner())).toEqual(owner) + }, 21000) + + test('multiple updates using setUpdate and lookup', async () => { + const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) + const referenceBytes = hexToBytes(reference) + assertBytes(referenceBytes, 32) + const random = new Date().getTime().toString().padStart(64, '0') + const multipleUpdateTopic = Utils.Hex.makeHexString(random) as Topic + + const updatePeriod = 5000 + const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) + + const numUpdates = 5 + + const initialTime = getCurrentTime() + + const sleep = async (seconds: number) => + new Promise((resolve, reject) => { + setTimeout(() => resolve(true), seconds * 1000) + }) + let lookupTime = 0 + for (let i = 0; i < 0 + numUpdates; i++) { + const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> + + await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI), initialTime, updatePeriod, lookupTime) + await sleep(5) + await feedRw.getUpdate(initialTime, updatePeriod, lookupTime) + lookupTime = getCurrentTime() + } + + const feedUpdateResponse = await feedRw.getUpdates(initialTime, updatePeriod) + expect(feedUpdateResponse[0].updatePeriod).toEqual(5000) + expect(feedUpdateResponse[1].updatePeriod).toEqual(5000) + expect(feedUpdateResponse[2].updatePeriod).toEqual(5000) + expect(feedUpdateResponse[3].updatePeriod).toEqual(5000) + expect(feedUpdateResponse[4].updatePeriod).toEqual(5000) + expect(feedUpdateResponse.length).toEqual(5) + }, 45000) +}) From 948fc5c4a9a674e68e4c5e1690e37a3968295065 Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Thu, 7 Jul 2022 13:23:40 -0500 Subject: [PATCH 02/21] feat(feed): docs --- README.md | 369 +++++++++++++++++++++++++++++++++++++++++++++++ src/streaming.ts | 2 +- 2 files changed, 370 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5a50f84..3f12c1e 100644 --- a/README.md +++ b/README.md @@ -9,3 +9,372 @@ The current Feed implementations only let you fetch the latest update and append This library gives you more freedom for Swarm Feed reading and manipulation by specifying indices on actions and add additional payload. For new feed implementations, there are interfaces to implement in order to facilitate the integrations and make interoperability between Feed types. + +# API + +## Sequential feeds + +## Feed Reader + +## makeFeedR + +Creates a new Feed reader + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `topic` | `Topic | Uint8Array | string` | The feeds topic | +| `owner` | `EthAddress | Uint8Array | string` | Address of signer | +| `options` | `any ` | Options | + +### Returns + +Returns a feed reader object of type SwarmFeedR + +### Example + +```typescript +import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' +import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' +import { SequentialFeed } from '../../src/sequential-feed' +import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from '../../src/utils' +import { beeUrl, getPostageBatch } from '../utils' + +const myIdentity = { + privateKey: '...private key as hex...' as HexString, + publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, + address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, +} +const owner = Utils.Hex.makeHexString(myIdentity.address, 40) +const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) +const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic +const bee = new Bee(beeUrl()) +const batchId = getPostageBatch() + +const sequentialFeed = new SequentialFeed(bee) + +// Feed Reader +const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic +const feedR = sequentialFeed.makeFeedR(emptyTopic, testIdentity.address) +const lastIndex = await feedR.getLastIndex() + +``` + +## getLastIndex + +Gets the last index of the feed + +### Arguments + +None + +### Returns + +A number + + +## findLastUpdate + +Gets the last chunk + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `options` | `any ` | Options | + + +### Returns + +A FeedChunk object + + +## getUpdate + +Gets a chunk update from a index + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `index` | `Index ` | Options | + + +### Returns + +A FeedChunk object + + +## getUpdates + +Gets a set of chunks + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `indices` | `Index[] ` | Options | + + +### Returns + +A FeedChunk object + + +## Feed Writer + +## makeFeedRW + +Creates a new Feed read-writer + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `topic` | `Topic | Uint8Array | string` | The feeds topic | +| `signer` | `Signer | Uint8Array | string` | Address of signer | +| `options` | `any ` | Options | + +### Returns + +Returns a feed reader-writer object of type SwarmFeedRW + +### Example + +```typescript +import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' +import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' +import { SequentialFeed } from '../../src/sequential-feed' +import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from '../../src/utils' +import { beeUrl, getPostageBatch } from '../utils' + +const myIdentity = { + privateKey: '...private key as hex...' as HexString, + publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, + address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, +} +const owner = Utils.Hex.makeHexString(myIdentity.address, 40) +const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) +const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic +const bee = new Bee(beeUrl()) +const batchId = getPostageBatch() + +const sequentialFeed = new SequentialFeed(bee) + +// Feed Reader/Writer +const feedRw = sequentialFeed.makeFeedRW(topic, signer) +const currentIndex = await feedRw.getLastIndex() + +const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000124' as HexString<64> +const feedReference = await feedRw.setLastUpdate(batchId, testReference) + +const feedUpdate = await feedRw.findLastUpdate() +``` + +## setLastUpdate + +Appends a new chunk to a feed + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `postageBatchId` | `string | BatchId ` | postage batch id | +| `reference` | `Reference` | reference | + + +### Returns + +A Reference object + + +## setUpdate + +Inserts a new chunk to a feed at a specified index position + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `index` | `Index ` | index | +| `postageBatchId` | `string | BatchId ` | postage batch id | +| `reference` | `Reference` | reference | + + +### Returns + +A Reference object + + +## Streaming feeds + +## Feed Reader + +## makeFeedR + +Creates a new StreamingFeed reader + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `topic` | `Topic | Uint8Array | string` | The feeds topic | +| `owner` | `EthAddress | Uint8Array | string` | Address of signer | +| `options` | `any ` | Options | + +### Returns + +Returns a feed reader object of type SwarmStreamingFeedR + +### Example + +```typescript +import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' +import { getCurrentTime, StreamingFeed } from '../../src/streaming-feed' +import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' +import { beeUrl, getPostageBatch } from '../utils' + +const myIdentity = { + privateKey: '...private key as hex...' as HexString, + publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, + address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, +} +const owner = Utils.Hex.makeHexString(myIdentity.address, 40) +const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) +const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic +const bee = new Bee(beeUrl()) +const batchId = getPostageBatch() + +const streamingFeed = new StreamingFeed(bee) + +// Feed Reader +const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic +const feedR = streamingFeed.makeFeedR(emptyTopic, testIdentity.address) +const lastIndex = await feedR.getIndexForArbitraryTime(getCurrentTime()) +``` + +## getIndexForArbitraryTime + +Gets an index from an arbitrary time + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `lookupTime` | `number ` | Time position to lookup | +| `initialTime` | `number ` | feed chunk timestamp for index at 0 | +| `updatePeriod` | `number ` | feed update frequency | + +### Returns + +An Index object + + +## getUpdate + +Gets a chunk update from an arbitrary time, if lookup time is empty, it will return the latest update + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `initialTime` | `number ` | feed chunk timestamp for index at 0 | +| `updatePeriod` | `number ` | feed update frequency | +| `lookupTime` | `number ` | Time position to lookup (optional) | + + +### Returns + +A StreamingFeedChunk object + + +## getUpdates + +Gets a set of chunks from an arbitray time + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `initialTime` | `number ` | feed chunk timestamp for index at 0 | +| `updatePeriod` | `number ` | feed update frequency | + + +### Returns + +A StreamingFeedChunk object + + +## Feed Writer + +## makeFeedRW + +Creates a new Feed read-writer + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `topic` | `Topic | Uint8Array | string` | The feeds topic | +| `signer` | `Signer | Uint8Array | string` | Address of signer | +| `options` | `any ` | Options | + +### Returns + +Returns a feed reader-writer object of type SwarmFeedRW + +### Example + +```typescript +import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' +import { getCurrentTime, StreamingFeed } from '../../src/streaming-feed' +import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' +import { beeUrl, getPostageBatch } from '../utils' + +const myIdentity = { + privateKey: '...private key as hex...' as HexString, + publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, + address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, +} +const owner = Utils.Hex.makeHexString(myIdentity.address, 40) +const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) +const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic +const bee = new Bee(beeUrl()) +const batchId = getPostageBatch() + +const streamingFeed = new StreamingFeed(bee) + +// Feed Reader/Writer +const feedRw = streamingFeed.makeFeedRW(topic, signer) + +const initialTime = getCurrentTime() +const updatePeriod = 5000 +const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> +await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) + +const feedUpdate = await feedRw.getUpdate(initialTime, updatePeriod) +const lastIndex = await feedRw.getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + +``` + +## setLastUpdate + +Appends a new chunk to a feed, if the lookup time is empty, it will be added to the end + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `postageBatchId` | `string | BatchId ` | postage batch id | +| `reference` | `Reference` | reference | +| `initialTime` | `number ` | feed chunk timestamp for index at 0 | +| `updatePeriod` | `number ` | feed update frequency | +| `lookupTime` | `number ` | Time position to lookup (optional) | + + + +### Returns + +A Reference object + diff --git a/src/streaming.ts b/src/streaming.ts index 3fd90cc..221688f 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -58,7 +58,7 @@ export interface SwarmStreamingFeedR extends SwarmFeedHandler { getUpdates(initialTime: number, updatePeriod: number): Promise[]> } -/** Swarm Feed Read and operations */ +/** Swarm Feed Read and Write operations */ export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { setLastUpdate( postageBatchId: string | BatchId, From 1f1a5036076ce660b4b14f539e19bad001ff719d Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Mon, 11 Jul 2022 18:03:30 -0500 Subject: [PATCH 03/21] feat: requested changes from code review, pending jsdoc --- README.md | 77 ++++++-------------- src/streaming-feed.ts | 72 +++++++++---------- src/streaming.ts | 96 ++++++++----------------- test/integration/streaming-feed.spec.ts | 4 +- 4 files changed, 88 insertions(+), 161 deletions(-) diff --git a/README.md b/README.md index 3f12c1e..5ae7c51 100644 --- a/README.md +++ b/README.md @@ -37,27 +37,18 @@ Returns a feed reader object of type SwarmFeedR ```typescript import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' -import { SequentialFeed } from '../../src/sequential-feed' -import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from '../../src/utils' -import { beeUrl, getPostageBatch } from '../utils' +import { SequentialFeed } from 'swarm-feeds' +import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from 'swarm-feeds/dist/src/utils' const myIdentity = { - privateKey: '...private key as hex...' as HexString, - publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, } -const owner = Utils.Hex.makeHexString(myIdentity.address, 40) -const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic -const bee = new Bee(beeUrl()) -const batchId = getPostageBatch() - -const sequentialFeed = new SequentialFeed(bee) +const sequentialFeed = new SequentialFeed(new Bee('http://localhost:1633')) // Feed Reader const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic -const feedR = sequentialFeed.makeFeedR(emptyTopic, testIdentity.address) -const lastIndex = await feedR.getLastIndex() +const feedR = sequentialFeed.makeFeedR(emptyTopic, myIdentity.address) ``` @@ -143,33 +134,25 @@ Returns a feed reader-writer object of type SwarmFeedRW ### Example ```typescript + import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' -import { SequentialFeed } from '../../src/sequential-feed' -import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from '../../src/utils' -import { beeUrl, getPostageBatch } from '../utils' +import { SequentialFeed } from 'swarm-feeds' +import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from 'swarm-feeds/dist/src/utils' const myIdentity = { - privateKey: '...private key as hex...' as HexString, + privateKey: '634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd' as HexString, publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, } -const owner = Utils.Hex.makeHexString(myIdentity.address, 40) const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic -const bee = new Bee(beeUrl()) -const batchId = getPostageBatch() - -const sequentialFeed = new SequentialFeed(bee) +const sequentialFeed = new SequentialFeed(new Bee('http://localhost:1633')) // Feed Reader/Writer const feedRw = sequentialFeed.makeFeedRW(topic, signer) const currentIndex = await feedRw.getLastIndex() -const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000124' as HexString<64> -const feedReference = await feedRw.setLastUpdate(batchId, testReference) - -const feedUpdate = await feedRw.findLastUpdate() ``` ## setLastUpdate @@ -231,28 +214,20 @@ Returns a feed reader object of type SwarmStreamingFeedR ```typescript import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' -import { getCurrentTime, StreamingFeed } from '../../src/streaming-feed' -import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' -import { beeUrl, getPostageBatch } from '../utils' +import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' +import { StreamingFeed } from 'swarm-feeds' +import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from 'swarm-feeds/dist/src/utils' const myIdentity = { - privateKey: '...private key as hex...' as HexString, - publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, } -const owner = Utils.Hex.makeHexString(myIdentity.address, 40) -const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic -const bee = new Bee(beeUrl()) -const batchId = getPostageBatch() - -const streamingFeed = new StreamingFeed(bee) +const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633')) // Feed Reader const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic -const feedR = streamingFeed.makeFeedR(emptyTopic, testIdentity.address) -const lastIndex = await feedR.getIndexForArbitraryTime(getCurrentTime()) -``` +const feedR = streamingFeed.makeFeedR(emptyTopic, myIdentity.address) + ``` ## getIndexForArbitraryTime @@ -328,34 +303,22 @@ Returns a feed reader-writer object of type SwarmFeedRW ```typescript import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' -import { getCurrentTime, StreamingFeed } from '../../src/streaming-feed' -import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' -import { beeUrl, getPostageBatch } from '../utils' +import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' +import { StreamingFeed } from 'swarm-feeds' +import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from 'swarm-feeds/dist/src/utils' const myIdentity = { - privateKey: '...private key as hex...' as HexString, + privateKey: '634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd' as HexString, publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, } -const owner = Utils.Hex.makeHexString(myIdentity.address, 40) const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic -const bee = new Bee(beeUrl()) -const batchId = getPostageBatch() - -const streamingFeed = new StreamingFeed(bee) +const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633')) // Feed Reader/Writer const feedRw = streamingFeed.makeFeedRW(topic, signer) -const initialTime = getCurrentTime() -const updatePeriod = 5000 -const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> -await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) - -const feedUpdate = await feedRw.getUpdate(initialTime, updatePeriod) -const lastIndex = await feedRw.getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) - ``` ## setLastUpdate diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts index 2fd8f8f..b6738ab 100644 --- a/src/streaming-feed.ts +++ b/src/streaming-feed.ts @@ -1,8 +1,7 @@ import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' -import { FeedType } from './feed' +import { FeedType, makeTopic } from './feed' import { assembleSocPayload, - makeTopic, mapSocToFeed, StreamingFeedChunk, SwarmStreamingFeed, @@ -24,25 +23,18 @@ export class StreamingFeed implements SwarmStreamingFeed { public makeFeedR( topic: Topic | Uint8Array | string, owner: Utils.Eth.EthAddress | Uint8Array | string, - ): SwarmStreamingFeedR { + ): SwarmStreamingFeedR { const socReader = this.bee.makeSOCReader(owner) const topicHex = makeTopic(topic) const topicBytes = hexToBytes<32>(topicHex) const ownerHex = Utils.Eth.makeHexEthAddress(owner) - const getIndexForArbitraryTime = async ( - lookupTime: number, - initialTime: number, - updatePeriod: number, - ): Promise => { + const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { const currentTime = getCurrentTime() // Tp - try { - // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp - if (currentTime >= initialTime && lookupTime >= initialTime) { - return Math.floor((lookupTime - initialTime) / updatePeriod) - } - } catch (e) { - // no-op + + // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp + if (currentTime >= initialTime && lookupTime >= initialTime) { + return Math.floor((lookupTime - initialTime) / updatePeriod) } return -1 @@ -55,7 +47,7 @@ export class StreamingFeed implements SwarmStreamingFeed { lookupTime?: number, ): Promise => { lookupTime = lookupTime ?? getCurrentTime() - const index = await getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) return mapSocToFeed(socChunk) @@ -64,21 +56,29 @@ export class StreamingFeed implements SwarmStreamingFeed { // Download Feed Stream const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { const feeds: StreamingFeedChunk[] = [] - // while from last to first, use lookupTime = chunk.timestamp + 1 - - const index = await getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) - const socChunk = await socReader.download(this.getIdentifier(topicBytes, index - 1)) - - let feed = await mapSocToFeed(socChunk) - let i = 1 - feeds.push(feed) - while (i < index) { - feed = await getUpdate(initialTime, updatePeriod, feed.timestamp + updatePeriod + 1) - i++ - feeds.push(feed) - } - return feeds.reverse() + try { + let index = await getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + + index-- + + let lookupTime = getCurrentTime() + let feed + while (index > -1) { + // throws + const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) + feed = mapSocToFeed(socChunk) + lookupTime -= feed.updatePeriod + + feeds.push(feed) + index = await getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + index-- + } + + return feeds + } catch (e) { + return feeds + } } return { @@ -91,17 +91,14 @@ export class StreamingFeed implements SwarmStreamingFeed { } } - public makeFeedRW( - topic: string | Topic | Uint8Array, - signer: string | Uint8Array | Signer, - ): SwarmStreamingFeedRW { + public makeFeedRW(topic: string | Topic | Uint8Array, signer: string | Uint8Array | Signer): SwarmStreamingFeedRW { const canonicalSigner = makeSigner(signer) const topicHex = makeTopic(topic) const topicBytes = hexToBytes<32>(topicHex) const feedR = this.makeFeedR(topic, canonicalSigner.address) const socWriter = this.bee.makeSOCWriter(canonicalSigner) - const _setUpdate = async ( + const setUpdate = async ( index: number, postageBatchId: string | BatchId, reference: Reference, @@ -129,14 +126,15 @@ export class StreamingFeed implements SwarmStreamingFeed { lookupTime: number, ): Promise => { lookupTime = lookupTime ?? getCurrentTime() - const lastIndex = await feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + const lastIndex = feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - return _setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) + return setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) } return { ...feedR, setLastUpdate, + setUpdate, } } diff --git a/src/streaming.ts b/src/streaming.ts index 221688f..394738d 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -1,30 +1,13 @@ -import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' +import { BatchId, Bee, Reference, Signer, Topic } from '@ethersphere/bee-js' import type { SingleOwnerChunk } from '@ethersphere/bee-js/dist/src/chunk/soc' import type { ChunkReference } from '@ethersphere/bee-js/dist/src/feed' import type { EthAddress } from '@ethersphere/bee-js/dist/src/utils/eth' import { FeedType, SwarmFeedHandler } from './feed' -import { - assertBytes, - Bytes, - bytesToHex, - hexToBytes, - readUint64BigEndian, - serializeBytes, - TOPIC_BYTES_LENGTH, - TOPIC_HEX_LENGTH, - writeUint64BigEndian, -} from './utils' +import { Bytes, readUint64BigEndian, serializeBytes, writeUint64BigEndian } from './utils' -export type StreamingFeedData = { - timestamp: number +export interface StreamingFeedChunk extends SingleOwnerChunk { + index: number reference: ChunkReference - updatePeriod: number - chunkIndex: number -} - -export interface StreamingFeedChunk extends SingleOwnerChunk { - index: Index - reference: Reference timestamp: number updatePeriod: number } @@ -40,26 +23,26 @@ export interface SwarmStreamingFeed { topic: Topic | Uint8Array | string, owner: EthAddress | Uint8Array | string, ...options: any[] - ): SwarmStreamingFeedR + ): SwarmStreamingFeedR /** get Feed interface with write and read operations */ makeFeedRW( topic: Topic | Uint8Array | string, signer: Signer | Uint8Array | string, options?: any, - ): SwarmStreamingFeedRW + ): SwarmStreamingFeedRW /** Get Single Owner Chunk identifier */ getIdentifier(topic: Bytes<32>, index: Index): Bytes<32> } /** Swarm Feed Read operations */ -export interface SwarmStreamingFeedR extends SwarmFeedHandler { - getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): Promise | Index - getUpdate(initialTime: number, updatePeriod: number, lookupTime?: Index): Promise> - getUpdates(initialTime: number, updatePeriod: number): Promise[]> +export interface SwarmStreamingFeedR extends SwarmFeedHandler { + getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): number + getUpdate(initialTime: number, updatePeriod: number, lookupTime?: number): Promise + getUpdates(initialTime: number, updatePeriod: number): Promise } /** Swarm Feed Read and Write operations */ -export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { +export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { setLastUpdate( postageBatchId: string | BatchId, reference: Reference, @@ -67,9 +50,17 @@ export interface SwarmStreamingFeedRW extends SwarmStreamingFeed updatePeriod: number, lookupTime?: number, ): Promise + setUpdate( + index: number, + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + lookupTime?: number, + ): Promise } -export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedData { +export function extractDataFromSocPayload(version: number, payload: Uint8Array): StreamingFeedChunk { const index = readUint64BigEndian(payload.slice(0, 8) as Bytes<8>) const updatePeriod = readUint64BigEndian(payload.slice(8, 16) as Bytes<8>) const timestamp = readUint64BigEndian(payload.slice(16, 24) as Bytes<8>) @@ -79,61 +70,36 @@ export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedDat return { timestamp, updatePeriod, - chunkIndex: index, + index, reference: p as ChunkReference, - } + } as any } // TODO handle JSON-like metadata throw new Error('NotImplemented: payload is longer than expected') } -export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { - const { reference, timestamp, updatePeriod, chunkIndex } = extractDataFromSocPayload(socChunk.payload()) +export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { + const VERSION = 3 + const { reference, timestamp, updatePeriod, index } = extractDataFromSocPayload(VERSION, socChunk.payload()) return { ...socChunk, - index: chunkIndex as unknown as Index, + index, timestamp, updatePeriod, - reference: bytesToHex(reference), + reference: reference, } } export function assembleSocPayload( reference: ChunkReference, - options?: { at?: number; updatePeriod?: number; index?: number }, + options: { at: number; updatePeriod: number; index: number }, ): Uint8Array { - const at = options?.at ?? Date.now() / 1000.0 + const at = options.at ?? Date.now() / 1000.0 const timestamp = writeUint64BigEndian(at) - const updatePeriod = writeUint64BigEndian(options?.updatePeriod ?? 0) - const chunkIndex = writeUint64BigEndian(options?.index ?? -1) + const updatePeriod = writeUint64BigEndian(options.updatePeriod) + const chunkIndex = writeUint64BigEndian(options.index) return serializeBytes(chunkIndex, updatePeriod, timestamp, reference) } - -/** Converts feedIndex response to integer */ -export function fetchIndexToInt(fetchIndex: string): number { - const indexBytes = hexToBytes(fetchIndex) - let index = 0 - for (let i = indexBytes.length - 1; i >= 0; i--) { - const byte = indexBytes[i] - - if (byte === 0) break - - index += byte - } - - return index -} - -export function makeTopic(topic: Uint8Array | string): Topic { - if (typeof topic === 'string') { - return Utils.Hex.makeHexString(topic, TOPIC_HEX_LENGTH) - } else if (topic instanceof Uint8Array) { - assertBytes<32>(topic, TOPIC_BYTES_LENGTH) - - return bytesToHex(topic, TOPIC_HEX_LENGTH) - } - throw new TypeError('invalid topic') -} diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index acc533a..fca9850 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -58,7 +58,7 @@ describe('streaming feed', () => { new Promise((resolve, reject) => { setTimeout(() => resolve(true), seconds * 1000) }) - let lookupTime = 0 + let lookupTime = getCurrentTime() for (let i = 0; i < 0 + numUpdates; i++) { const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> @@ -69,11 +69,11 @@ describe('streaming feed', () => { } const feedUpdateResponse = await feedRw.getUpdates(initialTime, updatePeriod) + expect(feedUpdateResponse.length).toEqual(5) expect(feedUpdateResponse[0].updatePeriod).toEqual(5000) expect(feedUpdateResponse[1].updatePeriod).toEqual(5000) expect(feedUpdateResponse[2].updatePeriod).toEqual(5000) expect(feedUpdateResponse[3].updatePeriod).toEqual(5000) expect(feedUpdateResponse[4].updatePeriod).toEqual(5000) - expect(feedUpdateResponse.length).toEqual(5) }, 45000) }) From 0f11f46473d72b657029e56dc836af0fe794f6f6 Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Tue, 12 Jul 2022 11:14:06 -0500 Subject: [PATCH 04/21] docs: added jsdoc --- src/sequential-feed.ts | 50 ++++++++++++++++++++++++++++++++- src/streaming-feed.ts | 64 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 6 deletions(-) diff --git a/src/sequential-feed.ts b/src/sequential-feed.ts index a621d53..b461c84 100644 --- a/src/sequential-feed.ts +++ b/src/sequential-feed.ts @@ -23,6 +23,12 @@ export class SequentialFeed implements SwarmFeed { this.type = 'sequential' } + /** + * Creates a sequential feed reader + * @param topic a swarm topic + * @param signer signer + * @returns a sequential feed reader + */ public makeFeedR( topic: Topic | Uint8Array | string, owner: Utils.Eth.EthAddress | Uint8Array | string, @@ -32,6 +38,10 @@ export class SequentialFeed implements SwarmFeed { const topicBytes = hexToBytes<32>(topicHex) const ownerHex = Utils.Eth.makeHexEthAddress(owner) + /** + * Gets the last index in the feed + * @returns An index number + */ const getLastIndex = async (): Promise => { // It fetches the latest feed on bee-side, because it is faster than lookup for the last index by individual API calls. const feedReader = this.bee.makeFeedReader('sequence', topic, owner) @@ -48,6 +58,10 @@ export class SequentialFeed implements SwarmFeed { return index } + /** + * Gets the last appended chunk in the feed + * @returns A feed chunk + */ const findLastUpdate = async (): Promise => { const index = await getLastIndex() const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) @@ -55,12 +69,22 @@ export class SequentialFeed implements SwarmFeed { return mapSocToFeed(socChunk, index) } + /** + * Downloads a chunk by index number + * @param index index number + * @returns A feed chunk + */ const getUpdate = async (index: number): Promise => { const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) return mapSocToFeed(socChunk, index) } + /** + * Download all chunk by indices + * @param indices an array of index numbers + * @returns An array of chunks + */ const getUpdates = async (indices: number[]): Promise => { const promises: Promise[] = [] for (const index of indices) { @@ -85,6 +109,12 @@ export class SequentialFeed implements SwarmFeed { } } + /** + * Creates a sequential feed reader / writer + * @param topic a swarm topic + * @param signer signer + * @returns a sequential feed reader / writer + */ public makeFeedRW(topic: string | Topic | Uint8Array, signer: string | Uint8Array | Signer): SwarmFeedRW { const canonicalSigner = makeSigner(signer) const topicHex = makeTopic(topic) @@ -92,6 +122,13 @@ export class SequentialFeed implements SwarmFeed { const feedR = this.makeFeedR(topic, canonicalSigner.address) const socWriter = this.bee.makeSOCWriter(canonicalSigner) + /** + * Sets the upload chunk to update + * @param index the chunk index to update + * @param postageBatchId swarm postage batch id + * @param reference chunk reference + * @returns a chunk reference + */ const setUpdate = async ( index: number, postageBatchId: string | BatchId, @@ -106,6 +143,12 @@ export class SequentialFeed implements SwarmFeed { ) } + /** + * Sets the next upload chunk + * @param postageBatchId swarm postage batch id + * @param reference chunk reference + * @returns a chunk reference + */ const setLastUpdate = async (postageBatchId: string | BatchId, reference: Reference): Promise => { let index: number try { @@ -125,7 +168,12 @@ export class SequentialFeed implements SwarmFeed { } } - /** Get Single Owner Chunk identifier */ + /** + * Get Single Owner Chunk identifier + * @param topic a swarm topic, bytes 32 length + * @param index the chunk index + * @returns a bytes 32 + */ public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { const indexBytes = writeUint64BigEndian(index) diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts index b6738ab..13b2687 100644 --- a/src/streaming-feed.ts +++ b/src/streaming-feed.ts @@ -20,6 +20,12 @@ export class StreamingFeed implements SwarmStreamingFeed { this.type = 'fault-tolerant-stream' } + /** + * Creates a streaming feed reader + * @param topic a swarm topic + * @param owner owner + * @returns a streaming feed reader + */ public makeFeedR( topic: Topic | Uint8Array | string, owner: Utils.Eth.EthAddress | Uint8Array | string, @@ -29,6 +35,13 @@ export class StreamingFeed implements SwarmStreamingFeed { const topicBytes = hexToBytes<32>(topicHex) const ownerHex = Utils.Eth.makeHexEthAddress(owner) + /** + * Calculates nearest index + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns Returns -1 if not found, otherwise the index + */ const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { const currentTime = getCurrentTime() // Tp @@ -40,7 +53,13 @@ export class StreamingFeed implements SwarmStreamingFeed { return -1 } - // Download Feed Chunk at Specific Time + /** + * Download Feed Chunk at Specific Time + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns a StreamingFeedChunk object + */ const getUpdate = async ( initialTime: number, updatePeriod: number, @@ -53,12 +72,17 @@ export class StreamingFeed implements SwarmStreamingFeed { return mapSocToFeed(socChunk) } - // Download Feed Stream + /** + * Download all feed chunks + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @returns a StreamingFeedChunk array object + */ const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { const feeds: StreamingFeedChunk[] = [] try { - let index = await getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + let index = getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) index-- @@ -71,7 +95,7 @@ export class StreamingFeed implements SwarmStreamingFeed { lookupTime -= feed.updatePeriod feeds.push(feed) - index = await getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) index-- } @@ -91,6 +115,12 @@ export class StreamingFeed implements SwarmStreamingFeed { } } + /** + * Creates a streaming feed reader / writer + * @param topic a swarm topic + * @param signer signer + * @returns a streaming feed reader / writer + */ public makeFeedRW(topic: string | Topic | Uint8Array, signer: string | Uint8Array | Signer): SwarmStreamingFeedRW { const canonicalSigner = makeSigner(signer) const topicHex = makeTopic(topic) @@ -98,6 +128,16 @@ export class StreamingFeed implements SwarmStreamingFeed { const feedR = this.makeFeedR(topic, canonicalSigner.address) const socWriter = this.bee.makeSOCWriter(canonicalSigner) + /** + * Sets the upload chunk to update + * @param index the chunk index to update + * @param postageBatchId swarm postage batch id + * @param reference chunk reference + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns a chunk reference + */ const setUpdate = async ( index: number, postageBatchId: string | BatchId, @@ -118,6 +158,15 @@ export class StreamingFeed implements SwarmStreamingFeed { ) } + /** + * Sets the next upload chunk + * @param postageBatchId swarm postage batch id + * @param reference chunk reference + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns a chunk reference + */ const setLastUpdate = async ( postageBatchId: string | BatchId, reference: Reference, @@ -138,7 +187,12 @@ export class StreamingFeed implements SwarmStreamingFeed { } } - /** Get Single Owner Chunk identifier */ + /** + * Get Single Owner Chunk identifier + * @param topic a swarm topic, bytes 32 length + * @param index the chunk index + * @returns a bytes 32 + */ public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { const indexBytes = writeUint64BigEndian(index) From 727bb1bfb8bb31d7c07ed383de6315481d7f14ac Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Tue, 12 Jul 2022 17:11:49 -0500 Subject: [PATCH 05/21] docs: updated readme --- README.md | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 5ae7c51..654dbcb 100644 --- a/README.md +++ b/README.md @@ -320,7 +320,6 @@ const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633')) const feedRw = streamingFeed.makeFeedRW(topic, signer) ``` - ## setLastUpdate Appends a new chunk to a feed, if the lookup time is empty, it will be added to the end @@ -337,6 +336,28 @@ Appends a new chunk to a feed, if the lookup time is empty, it will be added to +### Returns + +A Reference object + + +## setUpdate + +Sets a chunk to a feed at an index number + +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `index` | `number` | index | +| `postageBatchId` | `string | BatchId ` | postage batch id | +| `reference` | `Reference` | reference | +| `initialTime` | `number ` | feed chunk timestamp for index at 0 | +| `updatePeriod` | `number ` | feed update frequency | +| `lookupTime` | `number ` | Time position to lookup (optional) | + + + ### Returns A Reference object From 2654a5b8b361a4bb68a4ab84ed1e79485595da1f Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Thu, 21 Jul 2022 07:13:30 -0500 Subject: [PATCH 06/21] ci: testing node --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d7be4d5..3a16bf5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [12.x, 14.x, 16.x, 17.x] + node-version: [14.x, 16.x] steps: - name: Checkout From f8b6a1b33f9d971840952c22bab62b0794baa75e Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Thu, 21 Jul 2022 07:19:23 -0500 Subject: [PATCH 07/21] ci: updates --- .github/workflows/test.yml | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3a16bf5..b37187f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [14.x, 16.x] + node-version: [12.x, 14.x, 16.x, 17.zx] steps: - name: Checkout @@ -40,13 +40,6 @@ jobs: run: | echo "${{ secrets.GITHUB_TOKEN }}" | docker login https://docker.pkg.github.com -u ${GITHUB_ACTOR} --password-stdin - # Setup Bee environment - - name: Start Bee Factory environment - run: | - git clone --depth=1 https://github.com/ethersphere/bee-factory.git - chmod +x -R ./bee-factory/scripts - ./bee-factory/scripts/environment.sh start --detach --workers=$WORKERS - ## Try getting the node modules from cache, if failed npm ci - uses: actions/cache@v2 id: cache-npm From a1a96b14c922b3dca6d419a06321f26b40709525 Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Thu, 21 Jul 2022 08:37:27 -0500 Subject: [PATCH 08/21] ci: use fdp play --- .github/workflows/test.yml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index b37187f..00b0078 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [12.x, 14.x, 16.x, 17.zx] + node-version: [12.x, 14.x, 16.x] steps: - name: Checkout @@ -40,7 +40,13 @@ jobs: run: | echo "${{ secrets.GITHUB_TOKEN }}" | docker login https://docker.pkg.github.com -u ${GITHUB_ACTOR} --password-stdin - ## Try getting the node modules from cache, if failed npm ci + - name: Install fdp-play + run: npm install -g @fairdatasociety/fdp-play + + - name: Run fdp-play + run: fdp-play start -d --bee-version $BEE_VERSION + + ## Try getting the node modules from cache, if failed npm ci - uses: actions/cache@v2 id: cache-npm with: From f80cf3517dad51a398f3d5e1011c4b8edfd3a022 Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Thu, 21 Jul 2022 08:43:47 -0500 Subject: [PATCH 09/21] ci: bee 1.6.2 --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 00b0078..a57b59c 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ on: - '**' env: - BEE_VERSION: '1.4.1' + BEE_VERSION: '1.6.2' BLOCKCHAIN_VERSION: '1.2.0' WORKERS: 4 BEE_API_URL: 'http://127.0.0.1:1633' From ddd52a1f4be3a272caf624f5cc7f847f0715e1b9 Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Thu, 18 Aug 2022 12:19:34 -0500 Subject: [PATCH 10/21] feat: resolving some code review comments --- src/sequential-feed.ts | 2 +- src/streaming-feed.ts | 13 +++++-------- src/streaming.ts | 13 +++++++------ test/integration/sequential-feed.spec.ts | 2 +- test/integration/streaming-feed.spec.ts | 8 ++++---- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/src/sequential-feed.ts b/src/sequential-feed.ts index b461c84..7e2a682 100644 --- a/src/sequential-feed.ts +++ b/src/sequential-feed.ts @@ -26,7 +26,7 @@ export class SequentialFeed implements SwarmFeed { /** * Creates a sequential feed reader * @param topic a swarm topic - * @param signer signer + * @param owner owner * @returns a sequential feed reader */ public makeFeedR( diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts index 13b2687..d92ce7c 100644 --- a/src/streaming-feed.ts +++ b/src/streaming-feed.ts @@ -1,24 +1,21 @@ import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' -import { FeedType, makeTopic } from './feed' +import { makeTopic } from './feed' import { assembleSocPayload, mapSocToFeed, StreamingFeedChunk, - SwarmStreamingFeed, + IStreamingFeed, SwarmStreamingFeedR, SwarmStreamingFeedRW, + FaultTolerantStreamType, } from './streaming' import { ChunkReference, makeSigner, writeUint64BigEndian } from './utils' const { Hex } = Utils const { hexToBytes } = Hex export const getCurrentTime = (d = new Date()) => d.getTime() -export class StreamingFeed implements SwarmStreamingFeed { - public readonly type: FeedType - - public constructor(public readonly bee: Bee) { - this.type = 'fault-tolerant-stream' - } +export class StreamingFeed implements IStreamingFeed { + public constructor(public readonly bee: Bee, public type: FaultTolerantStreamType = 'fault-tolerant-stream') {} /** * Creates a streaming feed reader diff --git a/src/streaming.ts b/src/streaming.ts index 394738d..17ac6fd 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -2,7 +2,7 @@ import { BatchId, Bee, Reference, Signer, Topic } from '@ethersphere/bee-js' import type { SingleOwnerChunk } from '@ethersphere/bee-js/dist/src/chunk/soc' import type { ChunkReference } from '@ethersphere/bee-js/dist/src/feed' import type { EthAddress } from '@ethersphere/bee-js/dist/src/utils/eth' -import { FeedType, SwarmFeedHandler } from './feed' +import { SwarmFeedHandler } from './feed' import { Bytes, readUint64BigEndian, serializeBytes, writeUint64BigEndian } from './utils' export interface StreamingFeedChunk extends SingleOwnerChunk { @@ -12,10 +12,12 @@ export interface StreamingFeedChunk extends SingleOwnerChunk { updatePeriod: number } +export type FaultTolerantStreamType = 'fault-tolerant-stream' + /** Interface for feed type classes */ -export interface SwarmStreamingFeed { +export interface IStreamingFeed { /** Feed type identifier */ - readonly type: FeedType + readonly type: FaultTolerantStreamType /** initialised BeeJS instance */ readonly bee: Bee /** get Feed interface with read operations */ @@ -60,7 +62,7 @@ export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { ): Promise } -export function extractDataFromSocPayload(version: number, payload: Uint8Array): StreamingFeedChunk { +export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedChunk { const index = readUint64BigEndian(payload.slice(0, 8) as Bytes<8>) const updatePeriod = readUint64BigEndian(payload.slice(8, 16) as Bytes<8>) const timestamp = readUint64BigEndian(payload.slice(16, 24) as Bytes<8>) @@ -80,8 +82,7 @@ export function extractDataFromSocPayload(version: number, payload: Uint8Array): } export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { - const VERSION = 3 - const { reference, timestamp, updatePeriod, index } = extractDataFromSocPayload(VERSION, socChunk.payload()) + const { reference, timestamp, updatePeriod, index } = extractDataFromSocPayload(socChunk.payload()) return { ...socChunk, diff --git a/test/integration/sequential-feed.spec.ts b/test/integration/sequential-feed.spec.ts index 97802bf..13138f5 100644 --- a/test/integration/sequential-feed.spec.ts +++ b/test/integration/sequential-feed.spec.ts @@ -39,7 +39,7 @@ describe('feed', () => { }, 21000) test('multiple updates using setUpdate and lookup', async () => { - const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) + const reference = Utils.Hex.makeHexString('0000000000000000000000000000000000000000000000000000000000000000', 64) const referenceBytes = hexToBytes(reference) assertBytes(referenceBytes, 32) const random = new Date().getTime().toString().padStart(64, '0') diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index fca9850..1e67221 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -17,10 +17,10 @@ describe('streaming feed', () => { const batchId = getPostageBatch() const streamingFeed = new StreamingFeed(bee) - test('lookup for empty feed update', async () => { + test('lookup for empty feed update', () => { const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic const feedR = streamingFeed.makeFeedR(emptyTopic, testIdentity.address) - const lastIndex = await feedR.getIndexForArbitraryTime(getCurrentTime()) + const lastIndex = feedR.getIndexForArbitraryTime(getCurrentTime()) expect(lastIndex).toBe(-1) }, 40000) @@ -34,7 +34,7 @@ describe('streaming feed', () => { await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) const feedUpdate = await feedRw.getUpdate(initialTime, updatePeriod) - const lastIndex = await feedRw.getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + const lastIndex = feedRw.getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) expect(feedUpdate.index).toEqual(lastIndex) expect(bytesToHex(feedUpdate.owner())).toEqual(owner) @@ -55,7 +55,7 @@ describe('streaming feed', () => { const initialTime = getCurrentTime() const sleep = async (seconds: number) => - new Promise((resolve, reject) => { + new Promise(resolve => { setTimeout(() => resolve(true), seconds * 1000) }) let lookupTime = getCurrentTime() From 95cf56199e857f171d68ba8a74368450f683e5fa Mon Sep 17 00:00:00 2001 From: Rogelio Morrell Date: Mon, 22 Aug 2022 20:06:41 -0500 Subject: [PATCH 11/21] feat: adds getLastIndex and findLastUpdate --- src/getIndexForArbitraryTime.ts | 19 ++++++++++++++++ src/streaming-feed.ts | 29 ++++++++++++++----------- src/streaming.ts | 2 ++ test/integration/streaming-feed.spec.ts | 26 +++++++++++++++++++++- 4 files changed, 62 insertions(+), 14 deletions(-) create mode 100644 src/getIndexForArbitraryTime.ts diff --git a/src/getIndexForArbitraryTime.ts b/src/getIndexForArbitraryTime.ts new file mode 100644 index 0000000..12d9cfe --- /dev/null +++ b/src/getIndexForArbitraryTime.ts @@ -0,0 +1,19 @@ +export const getCurrentTime = (d = new Date()) => d.getTime() + +/** + * Calculates nearest index + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns Returns -1 if not found, otherwise the index + */ +export const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { + const currentTime = getCurrentTime() // Tp + + // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp + if (currentTime >= initialTime && lookupTime >= initialTime) { + return Math.floor((lookupTime - initialTime) / updatePeriod) + } + + return -1 +} diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts index d92ce7c..3798ff7 100644 --- a/src/streaming-feed.ts +++ b/src/streaming-feed.ts @@ -1,5 +1,6 @@ import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' import { makeTopic } from './feed' +import { getCurrentTime, getIndexForArbitraryTime } from './getIndexForArbitraryTime' import { assembleSocPayload, mapSocToFeed, @@ -13,7 +14,7 @@ import { ChunkReference, makeSigner, writeUint64BigEndian } from './utils' const { Hex } = Utils const { hexToBytes } = Hex -export const getCurrentTime = (d = new Date()) => d.getTime() + export class StreamingFeed implements IStreamingFeed { public constructor(public readonly bee: Bee, public type: FaultTolerantStreamType = 'fault-tolerant-stream') {} @@ -33,21 +34,21 @@ export class StreamingFeed implements IStreamingFeed { const ownerHex = Utils.Eth.makeHexEthAddress(owner) /** - * Calculates nearest index - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns Returns -1 if not found, otherwise the index + * Gets the last index in the feed + * @returns An index number */ - const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { - const currentTime = getCurrentTime() // Tp + const getLastIndex = async (initialTime: number, updatePeriod: number): Promise => { + const lookupTime = getCurrentTime() - // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp - if (currentTime >= initialTime && lookupTime >= initialTime) { - return Math.floor((lookupTime - initialTime) / updatePeriod) - } + return getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + } - return -1 + /** + * Gets the last appended chunk in the feed + * @returns A feed chunk + */ + const findLastUpdate = async (initialTime: number, updatePeriod: number): Promise => { + return getUpdate(initialTime, updatePeriod) } /** @@ -109,6 +110,8 @@ export class StreamingFeed implements IStreamingFeed { getIndexForArbitraryTime, getUpdate, getUpdates, + findLastUpdate, + getLastIndex, } } diff --git a/src/streaming.ts b/src/streaming.ts index 17ac6fd..cf9594c 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -41,6 +41,8 @@ export interface SwarmStreamingFeedR extends SwarmFeedHandler { getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): number getUpdate(initialTime: number, updatePeriod: number, lookupTime?: number): Promise getUpdates(initialTime: number, updatePeriod: number): Promise + findLastUpdate(initialTime: number, updatePeriod: number): Promise + getLastIndex(initialTime: number, updatePeriod: number): Promise } /** Swarm Feed Read and Write operations */ diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index 1e67221..f91c91b 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -1,5 +1,6 @@ import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' -import { getCurrentTime, StreamingFeed } from '../../src/streaming-feed' +import { getCurrentTime } from '../../src/getIndexForArbitraryTime' +import { StreamingFeed } from '../../src/streaming-feed' import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' import { beeUrl, getPostageBatch } from '../utils' jest.setTimeout(360 * 1000) @@ -40,6 +41,29 @@ describe('streaming feed', () => { expect(bytesToHex(feedUpdate.owner())).toEqual(owner) }, 21000) + test('findLastUpdate should return last chunk', async () => { + const feedRw = streamingFeed.makeFeedRW(topic, signer) + + const initialTime = getCurrentTime() + const updatePeriod = 5000 + + const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) + + expect(bytesToHex(feedUpdate.owner())).toEqual(owner) + }, 21000) + + test('getLastIndex should return last index', async () => { + const feedRw = streamingFeed.makeFeedRW(topic, signer) + + const initialTime = getCurrentTime() + const updatePeriod = 5000 + + const index = await feedRw.getLastIndex(initialTime, updatePeriod) + const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) + + expect(feedUpdate.index).toEqual(index) + }, 21000) + test('multiple updates using setUpdate and lookup', async () => { const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) const referenceBytes = hexToBytes(reference) From d6309fac9a8e8d99a1b49fdda34fbc7674d6a89d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Tue, 15 Aug 2023 16:51:43 +0200 Subject: [PATCH 12/21] refactor: refactor --- .github/workflows/test.yml | 2 +- src/getIndexForArbitraryTime.ts | 19 --- src/index.ts | 1 + src/streaming-feed/index.ts | 201 ++++++++++++++++++++++++ src/streaming-feed/utils.ts | 127 +++++++++++++++ src/utils.ts | 2 + test/integration/streaming-feed.spec.ts | 4 +- 7 files changed, 334 insertions(+), 22 deletions(-) delete mode 100644 src/getIndexForArbitraryTime.ts create mode 100644 src/streaming-feed/index.ts create mode 100644 src/streaming-feed/utils.ts diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a57b59c..494b8c9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ on: - '**' env: - BEE_VERSION: '1.6.2' + BEE_VERSION: '1.13.0' BLOCKCHAIN_VERSION: '1.2.0' WORKERS: 4 BEE_API_URL: 'http://127.0.0.1:1633' diff --git a/src/getIndexForArbitraryTime.ts b/src/getIndexForArbitraryTime.ts deleted file mode 100644 index 12d9cfe..0000000 --- a/src/getIndexForArbitraryTime.ts +++ /dev/null @@ -1,19 +0,0 @@ -export const getCurrentTime = (d = new Date()) => d.getTime() - -/** - * Calculates nearest index - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns Returns -1 if not found, otherwise the index - */ -export const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { - const currentTime = getCurrentTime() // Tp - - // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp - if (currentTime >= initialTime && lookupTime >= initialTime) { - return Math.floor((lookupTime - initialTime) / updatePeriod) - } - - return -1 -} diff --git a/src/index.ts b/src/index.ts index f9e7182..c7ef7a0 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,2 +1,3 @@ export * as Feeds from './feed' export { SequentialFeed } from './sequential-feed' +export { StreamingFeed } from './streaming-feed/index' diff --git a/src/streaming-feed/index.ts b/src/streaming-feed/index.ts new file mode 100644 index 0000000..209a64b --- /dev/null +++ b/src/streaming-feed/index.ts @@ -0,0 +1,201 @@ +import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' +import { makeTopic } from '../feed' +import { + getIndexForArbitraryTime, + assembleSocPayload, + mapSocToFeed, + StreamingFeedChunk, + IStreamingFeed, + SwarmStreamingFeedR, + SwarmStreamingFeedRW, + FaultTolerantStreamType, +} from './utils' +import { ChunkReference, makeSigner, writeUint64BigEndian, getCurrentTime } from '../utils' + +const { Hex } = Utils +const { hexToBytes } = Hex + +export class StreamingFeed implements IStreamingFeed { + public constructor(public readonly bee: Bee, public type: FaultTolerantStreamType = 'fault-tolerant-stream') {} + + /** + * Creates a streaming feed reader + * @param topic a swarm topic + * @param owner owner + * @returns a streaming feed reader + */ + public makeFeedR( + topic: Topic | Uint8Array | string, + owner: Utils.Eth.EthAddress | Uint8Array | string, + ): SwarmStreamingFeedR { + const socReader = this.bee.makeSOCReader(owner) + const topicHex = makeTopic(topic) + const topicBytes = hexToBytes<32>(topicHex) + const ownerHex = Utils.Eth.makeHexEthAddress(owner) + + /** + * Gets the last index in the feed + * @returns An index number + */ + const getLastIndex = async (initialTime: number, updatePeriod: number): Promise => { + const lookupTime = getCurrentTime() + + return getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + } + + /** + * Gets the last appended chunk in the feed + * @returns A feed chunk + */ + const findLastUpdate = async (initialTime: number, updatePeriod: number): Promise => { + return getUpdate(initialTime, updatePeriod) + } + + /** + * Download Feed Chunk at Specific Time + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns a StreamingFeedChunk object + */ + const getUpdate = async ( + initialTime: number, + updatePeriod: number, + lookupTime?: number, + ): Promise => { + lookupTime = lookupTime ?? getCurrentTime() + const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) + + return mapSocToFeed(socChunk) + } + + /** + * Download all feed chunks + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @returns a StreamingFeedChunk array object + */ + const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { + const feeds: StreamingFeedChunk[] = [] + + try { + let index = getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + + index-- + + let lookupTime = getCurrentTime() + let feed + while (index > -1) { + // throws + const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) + feed = mapSocToFeed(socChunk) + lookupTime -= feed.updatePeriod + + feeds.push(feed) + index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + index-- + } + + return feeds + } catch (e) { + return feeds + } + } + + return { + type: 'fault-tolerant-stream', + owner: ownerHex, + topic: topicHex, + getIndexForArbitraryTime, + getUpdate, + getUpdates, + findLastUpdate, + getLastIndex, + } + } + + /** + * Creates a streaming feed reader / writer + * @param topic a swarm topic + * @param signer signer + * @returns a streaming feed reader / writer + */ + public makeFeedRW(topic: string | Topic | Uint8Array, signer: string | Uint8Array | Signer): SwarmStreamingFeedRW { + const canonicalSigner = makeSigner(signer) + const topicHex = makeTopic(topic) + const topicBytes = hexToBytes<32>(topicHex) + const feedR = this.makeFeedR(topic, canonicalSigner.address) + const socWriter = this.bee.makeSOCWriter(canonicalSigner) + + /** + * Sets the upload chunk to update + * @param index the chunk index to update + * @param postageBatchId swarm postage batch id + * @param reference chunk reference + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns a chunk reference + */ + const setUpdate = async ( + index: number, + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + ): Promise => { + const identifier = this.getIdentifier(topicBytes, index) + + return socWriter.upload( + postageBatchId, + identifier, + assembleSocPayload(hexToBytes(reference) as ChunkReference, { + at: initialTime, + updatePeriod, + index, + }), + ) + } + + /** + * Sets the next upload chunk + * @param postageBatchId swarm postage batch id + * @param reference chunk reference + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns a chunk reference + */ + const setLastUpdate = async ( + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + lookupTime: number, + ): Promise => { + lookupTime = lookupTime ?? getCurrentTime() + const lastIndex = feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + + return setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) + } + + return { + ...feedR, + setLastUpdate, + setUpdate, + } + } + + /** + * Get Single Owner Chunk identifier + * @param topic a swarm topic, bytes 32 length + * @param index the chunk index + * @returns a bytes 32 + */ + public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { + const indexBytes = writeUint64BigEndian(index) + + return Utils.keccak256Hash(topic, indexBytes) + } +} diff --git a/src/streaming-feed/utils.ts b/src/streaming-feed/utils.ts new file mode 100644 index 0000000..06aa5a5 --- /dev/null +++ b/src/streaming-feed/utils.ts @@ -0,0 +1,127 @@ +import { BatchId, Bee, Reference, Signer, Topic } from '@ethersphere/bee-js' +import type { SingleOwnerChunk } from '@ethersphere/bee-js/dist/src/chunk/soc' +import type { ChunkReference } from '@ethersphere/bee-js/dist/src/feed' +import type { EthAddress } from '@ethersphere/bee-js/dist/src/utils/eth' +import { getCurrentTime } from '../utils' +import { SwarmFeedHandler } from '../feed' +import { Bytes, readUint64BigEndian, serializeBytes, writeUint64BigEndian } from '../utils' + +/** + * Calculates nearest index + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + * @param lookupTime lookup time + * @returns Returns -1 if not found, otherwise the index + */ +export const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { + const currentTime = getCurrentTime() // Tp + + // the nearest last index to an arbitrary time (Tx) where T0 <= Tx <= Tn <= Tp + if (currentTime >= initialTime && lookupTime >= initialTime) { + return Math.floor((lookupTime - initialTime) / updatePeriod) + } + + return -1 +} + +export interface StreamingFeedChunk extends SingleOwnerChunk { + index: number + reference: ChunkReference + timestamp: number + updatePeriod: number +} + +export type FaultTolerantStreamType = 'fault-tolerant-stream' + +/** Interface for feed type classes */ +export interface IStreamingFeed { + /** Feed type identifier */ + readonly type: FaultTolerantStreamType + /** initialised BeeJS instance */ + readonly bee: Bee + /** get Feed interface with read operations */ + makeFeedR( + topic: Topic | Uint8Array | string, + owner: EthAddress | Uint8Array | string, + ...options: any[] + ): SwarmStreamingFeedR + /** get Feed interface with write and read operations */ + makeFeedRW( + topic: Topic | Uint8Array | string, + signer: Signer | Uint8Array | string, + options?: any, + ): SwarmStreamingFeedRW + /** Get Single Owner Chunk identifier */ + getIdentifier(topic: Bytes<32>, index: Index): Bytes<32> +} + +/** Swarm Feed Read operations */ +export interface SwarmStreamingFeedR extends SwarmFeedHandler { + getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): number + getUpdate(initialTime: number, updatePeriod: number, lookupTime?: number): Promise + getUpdates(initialTime: number, updatePeriod: number): Promise + findLastUpdate(initialTime: number, updatePeriod: number): Promise + getLastIndex(initialTime: number, updatePeriod: number): Promise +} + +/** Swarm Feed Read and Write operations */ +export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { + setLastUpdate( + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + lookupTime?: number, + ): Promise + setUpdate( + index: number, + postageBatchId: string | BatchId, + reference: Reference, + initialTime: number, + updatePeriod: number, + lookupTime?: number, + ): Promise +} + +export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedChunk { + const index = readUint64BigEndian(payload.slice(0, 8) as Bytes<8>) + const updatePeriod = readUint64BigEndian(payload.slice(8, 16) as Bytes<8>) + const timestamp = readUint64BigEndian(payload.slice(16, 24) as Bytes<8>) + const p = payload.slice(24) // 32 bytes + + if (p.length === 32 || p.length === 64) { + return { + timestamp, + updatePeriod, + index, + reference: p as ChunkReference, + } as any + } + + // TODO handle JSON-like metadata + throw new Error('NotImplemented: payload is longer than expected') +} + +export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { + const { reference, timestamp, updatePeriod, index } = extractDataFromSocPayload(socChunk.payload()) + + return { + ...socChunk, + index, + timestamp, + updatePeriod, + reference: reference, + } +} + +export function assembleSocPayload( + reference: ChunkReference, + options: { at: number; updatePeriod: number; index: number }, +): Uint8Array { + const at = options.at ?? Date.now() / 1000.0 + const timestamp = writeUint64BigEndian(at) + const updatePeriod = writeUint64BigEndian(options.updatePeriod) + const chunkIndex = writeUint64BigEndian(options.index) + + return serializeBytes(chunkIndex, updatePeriod, timestamp, reference) +} diff --git a/src/utils.ts b/src/utils.ts index 43c5e9f..7a74e0f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -193,3 +193,5 @@ export function isStrictlyObject(value: unknown): value is object { export function isObject(value: unknown): value is Record { return value !== null && typeof value === 'object' } + +export const getCurrentTime = (d = new Date()): number => d.getTime() diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index f91c91b..ee13cd2 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -1,6 +1,6 @@ import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' -import { getCurrentTime } from '../../src/getIndexForArbitraryTime' -import { StreamingFeed } from '../../src/streaming-feed' +import { getCurrentTime } from '../../src/utils' +import { StreamingFeed } from '../../src/streaming-feed/index' import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' import { beeUrl, getPostageBatch } from '../utils' jest.setTimeout(360 * 1000) From 36623ad3f935d35df851ac4a2ff31796c6dab0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Tue, 15 Aug 2023 18:08:35 +0200 Subject: [PATCH 13/21] fix: time resolution --- src/streaming-feed.ts | 201 ------------------------ src/streaming-feed/index.ts | 2 + src/utils.ts | 2 +- test/integration/streaming-feed.spec.ts | 18 +-- 4 files changed, 12 insertions(+), 211 deletions(-) delete mode 100644 src/streaming-feed.ts diff --git a/src/streaming-feed.ts b/src/streaming-feed.ts deleted file mode 100644 index 3798ff7..0000000 --- a/src/streaming-feed.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' -import { makeTopic } from './feed' -import { getCurrentTime, getIndexForArbitraryTime } from './getIndexForArbitraryTime' -import { - assembleSocPayload, - mapSocToFeed, - StreamingFeedChunk, - IStreamingFeed, - SwarmStreamingFeedR, - SwarmStreamingFeedRW, - FaultTolerantStreamType, -} from './streaming' -import { ChunkReference, makeSigner, writeUint64BigEndian } from './utils' - -const { Hex } = Utils -const { hexToBytes } = Hex - -export class StreamingFeed implements IStreamingFeed { - public constructor(public readonly bee: Bee, public type: FaultTolerantStreamType = 'fault-tolerant-stream') {} - - /** - * Creates a streaming feed reader - * @param topic a swarm topic - * @param owner owner - * @returns a streaming feed reader - */ - public makeFeedR( - topic: Topic | Uint8Array | string, - owner: Utils.Eth.EthAddress | Uint8Array | string, - ): SwarmStreamingFeedR { - const socReader = this.bee.makeSOCReader(owner) - const topicHex = makeTopic(topic) - const topicBytes = hexToBytes<32>(topicHex) - const ownerHex = Utils.Eth.makeHexEthAddress(owner) - - /** - * Gets the last index in the feed - * @returns An index number - */ - const getLastIndex = async (initialTime: number, updatePeriod: number): Promise => { - const lookupTime = getCurrentTime() - - return getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - } - - /** - * Gets the last appended chunk in the feed - * @returns A feed chunk - */ - const findLastUpdate = async (initialTime: number, updatePeriod: number): Promise => { - return getUpdate(initialTime, updatePeriod) - } - - /** - * Download Feed Chunk at Specific Time - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns a StreamingFeedChunk object - */ - const getUpdate = async ( - initialTime: number, - updatePeriod: number, - lookupTime?: number, - ): Promise => { - lookupTime = lookupTime ?? getCurrentTime() - const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) - - return mapSocToFeed(socChunk) - } - - /** - * Download all feed chunks - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @returns a StreamingFeedChunk array object - */ - const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { - const feeds: StreamingFeedChunk[] = [] - - try { - let index = getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) - - index-- - - let lookupTime = getCurrentTime() - let feed - while (index > -1) { - // throws - const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) - feed = mapSocToFeed(socChunk) - lookupTime -= feed.updatePeriod - - feeds.push(feed) - index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - index-- - } - - return feeds - } catch (e) { - return feeds - } - } - - return { - type: 'fault-tolerant-stream', - owner: ownerHex, - topic: topicHex, - getIndexForArbitraryTime, - getUpdate, - getUpdates, - findLastUpdate, - getLastIndex, - } - } - - /** - * Creates a streaming feed reader / writer - * @param topic a swarm topic - * @param signer signer - * @returns a streaming feed reader / writer - */ - public makeFeedRW(topic: string | Topic | Uint8Array, signer: string | Uint8Array | Signer): SwarmStreamingFeedRW { - const canonicalSigner = makeSigner(signer) - const topicHex = makeTopic(topic) - const topicBytes = hexToBytes<32>(topicHex) - const feedR = this.makeFeedR(topic, canonicalSigner.address) - const socWriter = this.bee.makeSOCWriter(canonicalSigner) - - /** - * Sets the upload chunk to update - * @param index the chunk index to update - * @param postageBatchId swarm postage batch id - * @param reference chunk reference - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns a chunk reference - */ - const setUpdate = async ( - index: number, - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - ): Promise => { - const identifier = this.getIdentifier(topicBytes, index) - - return socWriter.upload( - postageBatchId, - identifier, - assembleSocPayload(hexToBytes(reference) as ChunkReference, { - at: initialTime, - updatePeriod, - index, - }), - ) - } - - /** - * Sets the next upload chunk - * @param postageBatchId swarm postage batch id - * @param reference chunk reference - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time - * @returns a chunk reference - */ - const setLastUpdate = async ( - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime: number, - ): Promise => { - lookupTime = lookupTime ?? getCurrentTime() - const lastIndex = feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - - return setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) - } - - return { - ...feedR, - setLastUpdate, - setUpdate, - } - } - - /** - * Get Single Owner Chunk identifier - * @param topic a swarm topic, bytes 32 length - * @param index the chunk index - * @returns a bytes 32 - */ - public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { - const indexBytes = writeUint64BigEndian(index) - - return Utils.keccak256Hash(topic, indexBytes) - } -} diff --git a/src/streaming-feed/index.ts b/src/streaming-feed/index.ts index 209a64b..db59cba 100644 --- a/src/streaming-feed/index.ts +++ b/src/streaming-feed/index.ts @@ -56,12 +56,14 @@ export class StreamingFeed implements IStreamingFeed { * @param initialTime initial time of streaming feed * @param updatePeriod streaming feed frequency in milliseconds * @param lookupTime lookup time + * @param discover boolean, indicates whether the algorithm will look for the closest successful hit * @returns a StreamingFeedChunk object */ const getUpdate = async ( initialTime: number, updatePeriod: number, lookupTime?: number, + discover = true, ): Promise => { lookupTime = lookupTime ?? getCurrentTime() const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) diff --git a/src/utils.ts b/src/utils.ts index 7a74e0f..27282d6 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -194,4 +194,4 @@ export function isObject(value: unknown): value is Record { return value !== null && typeof value === 'object' } -export const getCurrentTime = (d = new Date()): number => d.getTime() +export const getCurrentTime = (d = new Date()): number => Math.floor(d.getTime() / 1000) // ms to s diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index ee13cd2..35a12f8 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -30,7 +30,7 @@ describe('streaming feed', () => { const feedRw = streamingFeed.makeFeedRW(topic, signer) const initialTime = getCurrentTime() - const updatePeriod = 5000 + const updatePeriod = 5 const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) @@ -45,7 +45,7 @@ describe('streaming feed', () => { const feedRw = streamingFeed.makeFeedRW(topic, signer) const initialTime = getCurrentTime() - const updatePeriod = 5000 + const updatePeriod = 5 const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) @@ -71,7 +71,7 @@ describe('streaming feed', () => { const random = new Date().getTime().toString().padStart(64, '0') const multipleUpdateTopic = Utils.Hex.makeHexString(random) as Topic - const updatePeriod = 5000 + const updatePeriod = 5 const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) const numUpdates = 5 @@ -93,11 +93,11 @@ describe('streaming feed', () => { } const feedUpdateResponse = await feedRw.getUpdates(initialTime, updatePeriod) - expect(feedUpdateResponse.length).toEqual(5) - expect(feedUpdateResponse[0].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[1].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[2].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[3].updatePeriod).toEqual(5000) - expect(feedUpdateResponse[4].updatePeriod).toEqual(5000) + expect(feedUpdateResponse.length).toEqual(numUpdates) + expect(feedUpdateResponse[0].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[1].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[2].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[3].updatePeriod).toEqual(updatePeriod) + expect(feedUpdateResponse[4].updatePeriod).toEqual(updatePeriod) }, 45000) }) From 2b2b629dee5123cce72564ac3bfd462eaa50f98f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Fri, 18 Aug 2023 18:16:08 +0200 Subject: [PATCH 14/21] revert: sec to ms --- .github/workflows/test.yml | 2 +- src/utils.ts | 2 +- test/integration/streaming-feed.spec.ts | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 494b8c9..f29eff1 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [12.x, 14.x, 16.x] + node-version: [14.x, 16.x, 18.x] steps: - name: Checkout diff --git a/src/utils.ts b/src/utils.ts index 27282d6..7a74e0f 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -194,4 +194,4 @@ export function isObject(value: unknown): value is Record { return value !== null && typeof value === 'object' } -export const getCurrentTime = (d = new Date()): number => Math.floor(d.getTime() / 1000) // ms to s +export const getCurrentTime = (d = new Date()): number => d.getTime() diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index 35a12f8..2f2a63d 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -30,7 +30,7 @@ describe('streaming feed', () => { const feedRw = streamingFeed.makeFeedRW(topic, signer) const initialTime = getCurrentTime() - const updatePeriod = 5 + const updatePeriod = 5000 const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) @@ -45,7 +45,7 @@ describe('streaming feed', () => { const feedRw = streamingFeed.makeFeedRW(topic, signer) const initialTime = getCurrentTime() - const updatePeriod = 5 + const updatePeriod = 5000 const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) @@ -71,23 +71,23 @@ describe('streaming feed', () => { const random = new Date().getTime().toString().padStart(64, '0') const multipleUpdateTopic = Utils.Hex.makeHexString(random) as Topic - const updatePeriod = 5 + const updatePeriod = 5000 const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) const numUpdates = 5 const initialTime = getCurrentTime() - const sleep = async (seconds: number) => + const sleep = async (ms: number) => new Promise(resolve => { - setTimeout(() => resolve(true), seconds * 1000) + setTimeout(() => resolve(true), ms) }) let lookupTime = getCurrentTime() for (let i = 0; i < 0 + numUpdates; i++) { const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI), initialTime, updatePeriod, lookupTime) - await sleep(5) + await sleep(updatePeriod) await feedRw.getUpdate(initialTime, updatePeriod, lookupTime) lookupTime = getCurrentTime() } From a224273fcd56b2368828d275fbc161780f413313 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Fri, 18 Aug 2023 18:23:04 +0200 Subject: [PATCH 15/21] refactor: make getLastIndex synchronous --- src/streaming-feed/index.ts | 2 +- src/streaming-feed/utils.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/streaming-feed/index.ts b/src/streaming-feed/index.ts index db59cba..e078cc8 100644 --- a/src/streaming-feed/index.ts +++ b/src/streaming-feed/index.ts @@ -37,7 +37,7 @@ export class StreamingFeed implements IStreamingFeed { * Gets the last index in the feed * @returns An index number */ - const getLastIndex = async (initialTime: number, updatePeriod: number): Promise => { + const getLastIndex = (initialTime: number, updatePeriod: number): number => { const lookupTime = getCurrentTime() return getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) diff --git a/src/streaming-feed/utils.ts b/src/streaming-feed/utils.ts index 06aa5a5..2b2c5d9 100644 --- a/src/streaming-feed/utils.ts +++ b/src/streaming-feed/utils.ts @@ -61,7 +61,7 @@ export interface SwarmStreamingFeedR extends SwarmFeedHandler { getUpdate(initialTime: number, updatePeriod: number, lookupTime?: number): Promise getUpdates(initialTime: number, updatePeriod: number): Promise findLastUpdate(initialTime: number, updatePeriod: number): Promise - getLastIndex(initialTime: number, updatePeriod: number): Promise + getLastIndex(initialTime: number, updatePeriod: number): number } /** Swarm Feed Read and Write operations */ From 327c39b1dc19120a8277be57345ecbcad959e0f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 21 Aug 2023 17:18:40 +0200 Subject: [PATCH 16/21] fix: discover updates --- src/feed.ts | 6 +- src/streaming-feed/index.ts | 137 ++++++++++-------------- src/streaming-feed/utils.ts | 107 ++---------------- test/integration/streaming-feed.spec.ts | 73 ++++--------- test/utils.ts | 12 ++- 5 files changed, 101 insertions(+), 234 deletions(-) diff --git a/src/feed.ts b/src/feed.ts index 997d926..034e1df 100644 --- a/src/feed.ts +++ b/src/feed.ts @@ -69,10 +69,10 @@ export interface SwarmFeed { /** Swarm Feed Read operations */ export interface SwarmFeedR extends SwarmFeedHandler { - getLastIndex(): Promise | Index + getLastIndex(...params: unknown[]): Promise | Index findLastUpdate(options?: any): Promise> - getUpdate(index: Index): Promise> - getUpdates(indices: Index[]): Promise[]> + getUpdate(index?: Index): Promise> + getUpdates(indices?: Index[]): Promise[]> } /** Swarm Feed Read and operations */ diff --git a/src/streaming-feed/index.ts b/src/streaming-feed/index.ts index e078cc8..7a58f86 100644 --- a/src/streaming-feed/index.ts +++ b/src/streaming-feed/index.ts @@ -1,22 +1,20 @@ import { BatchId, Bee, Reference, Signer, Topic, Utils } from '@ethersphere/bee-js' -import { makeTopic } from '../feed' -import { - getIndexForArbitraryTime, - assembleSocPayload, - mapSocToFeed, - StreamingFeedChunk, - IStreamingFeed, - SwarmStreamingFeedR, - SwarmStreamingFeedRW, - FaultTolerantStreamType, -} from './utils' +import { assembleSocPayload, FeedChunk, makeTopic, mapSocToFeed, SwarmFeed } from '../feed' +import { getIndexForArbitraryTime, SwarmStreamingFeedRW, FaultTolerantStreamType, SwarmStreamingFeedR } from './utils' import { ChunkReference, makeSigner, writeUint64BigEndian, getCurrentTime } from '../utils' const { Hex } = Utils const { hexToBytes } = Hex -export class StreamingFeed implements IStreamingFeed { - public constructor(public readonly bee: Bee, public type: FaultTolerantStreamType = 'fault-tolerant-stream') {} +export class StreamingFeed implements SwarmFeed { + public readonly type: FaultTolerantStreamType = 'fault-tolerant-stream' + + /** + * @param bee initialized BeeJS Bee instance + * @param initialTime initial time of streaming feed + * @param updatePeriod streaming feed frequency in milliseconds + */ + public constructor(public readonly bee: Bee, private initialTime: number, private updatePeriod: number) {} /** * Creates a streaming feed reader @@ -37,79 +35,78 @@ export class StreamingFeed implements IStreamingFeed { * Gets the last index in the feed * @returns An index number */ - const getLastIndex = (initialTime: number, updatePeriod: number): number => { + const getLastIndex = (): number => { const lookupTime = getCurrentTime() - return getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) + return getIndexForArbitraryTime(lookupTime, this.initialTime, this.updatePeriod) } /** * Gets the last appended chunk in the feed * @returns A feed chunk */ - const findLastUpdate = async (initialTime: number, updatePeriod: number): Promise => { - return getUpdate(initialTime, updatePeriod) + const findLastUpdate = async (): Promise => { + return getUpdate() } /** * Download Feed Chunk at Specific Time - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds * @param lookupTime lookup time - * @param discover boolean, indicates whether the algorithm will look for the closest successful hit + * @param discover indicates whether the algorithm will look for the closest successful hit * @returns a StreamingFeedChunk object */ - const getUpdate = async ( - initialTime: number, - updatePeriod: number, - lookupTime?: number, - discover = true, - ): Promise => { + const getUpdate = async (lookupTime?: number, discover = true): Promise => { lookupTime = lookupTime ?? getCurrentTime() - const index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) + let index = getIndexForArbitraryTime(lookupTime, this.initialTime, this.updatePeriod) + while (index >= 0) { + try { + const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) + + return mapSocToFeed(socChunk, index) + } catch (e) { + if (!discover) throw e + + index-- + } + } - return mapSocToFeed(socChunk) + throw new Error(`There is no update found in the feed`) } /** * Download all feed chunks - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds + * * @returns a StreamingFeedChunk array object */ - const getUpdates = async (initialTime: number, updatePeriod: number): Promise => { - const feeds: StreamingFeedChunk[] = [] + const getUpdates = async (): Promise => { + const feeds: FeedChunk[] = [] - try { - let index = getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) - - index-- + let index = getIndexForArbitraryTime(getCurrentTime(), this.initialTime, this.updatePeriod) - let lookupTime = getCurrentTime() - let feed - while (index > -1) { - // throws + let feed: FeedChunk + while (index >= 0) { + try { const socChunk = await socReader.download(this.getIdentifier(topicBytes, index)) - feed = mapSocToFeed(socChunk) - lookupTime -= feed.updatePeriod - + feed = mapSocToFeed(socChunk, index) feeds.push(feed) - index = getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - index-- + } catch (e) { + // NOOP } - - return feeds - } catch (e) { - return feeds + index-- } + + return feeds + } + + const getIndexForArbitraryTimeWrapper = (lookupTime: number) => { + return getIndexForArbitraryTime(lookupTime, this.initialTime, this.updatePeriod) } return { type: 'fault-tolerant-stream', owner: ownerHex, topic: topicHex, - getIndexForArbitraryTime, + getIndexForArbitraryTime: getIndexForArbitraryTimeWrapper, getUpdate, getUpdates, findLastUpdate, @@ -132,54 +129,32 @@ export class StreamingFeed implements IStreamingFeed { /** * Sets the upload chunk to update - * @param index the chunk index to update + * @param lookupTime lookup time in ms * @param postageBatchId swarm postage batch id * @param reference chunk reference - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time * @returns a chunk reference */ const setUpdate = async ( - index: number, + lookupTime: number, postageBatchId: string | BatchId, reference: Reference, - initialTime: number, - updatePeriod: number, ): Promise => { + const index = feedR.getIndexForArbitraryTime(lookupTime) const identifier = this.getIdentifier(topicBytes, index) - return socWriter.upload( - postageBatchId, - identifier, - assembleSocPayload(hexToBytes(reference) as ChunkReference, { - at: initialTime, - updatePeriod, - index, - }), - ) + return socWriter.upload(postageBatchId, identifier, assembleSocPayload(hexToBytes(reference) as ChunkReference)) } /** * Sets the next upload chunk + * + * @param lookupTime lookup time in millisec * @param postageBatchId swarm postage batch id * @param reference chunk reference - * @param initialTime initial time of streaming feed - * @param updatePeriod streaming feed frequency in milliseconds - * @param lookupTime lookup time * @returns a chunk reference */ - const setLastUpdate = async ( - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime: number, - ): Promise => { - lookupTime = lookupTime ?? getCurrentTime() - const lastIndex = feedR.getIndexForArbitraryTime(lookupTime, initialTime, updatePeriod) - - return setUpdate(lastIndex, postageBatchId, reference, initialTime, updatePeriod) + const setLastUpdate = async (postageBatchId: string | BatchId, reference: Reference): Promise => { + return setUpdate(getCurrentTime(), postageBatchId, reference) } return { diff --git a/src/streaming-feed/utils.ts b/src/streaming-feed/utils.ts index 2b2c5d9..e480649 100644 --- a/src/streaming-feed/utils.ts +++ b/src/streaming-feed/utils.ts @@ -1,10 +1,6 @@ -import { BatchId, Bee, Reference, Signer, Topic } from '@ethersphere/bee-js' -import type { SingleOwnerChunk } from '@ethersphere/bee-js/dist/src/chunk/soc' -import type { ChunkReference } from '@ethersphere/bee-js/dist/src/feed' -import type { EthAddress } from '@ethersphere/bee-js/dist/src/utils/eth' +import { BatchId, Reference } from '@ethersphere/bee-js' import { getCurrentTime } from '../utils' -import { SwarmFeedHandler } from '../feed' -import { Bytes, readUint64BigEndian, serializeBytes, writeUint64BigEndian } from '../utils' +import { FeedChunk, SwarmFeedR } from '../feed' /** * Calculates nearest index @@ -24,104 +20,17 @@ export const getIndexForArbitraryTime = (lookupTime: number, initialTime: number return -1 } -export interface StreamingFeedChunk extends SingleOwnerChunk { - index: number - reference: ChunkReference - timestamp: number - updatePeriod: number -} - export type FaultTolerantStreamType = 'fault-tolerant-stream' -/** Interface for feed type classes */ -export interface IStreamingFeed { - /** Feed type identifier */ - readonly type: FaultTolerantStreamType - /** initialised BeeJS instance */ - readonly bee: Bee - /** get Feed interface with read operations */ - makeFeedR( - topic: Topic | Uint8Array | string, - owner: EthAddress | Uint8Array | string, - ...options: any[] - ): SwarmStreamingFeedR - /** get Feed interface with write and read operations */ - makeFeedRW( - topic: Topic | Uint8Array | string, - signer: Signer | Uint8Array | string, - options?: any, - ): SwarmStreamingFeedRW - /** Get Single Owner Chunk identifier */ - getIdentifier(topic: Bytes<32>, index: Index): Bytes<32> -} - /** Swarm Feed Read operations */ -export interface SwarmStreamingFeedR extends SwarmFeedHandler { - getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): number - getUpdate(initialTime: number, updatePeriod: number, lookupTime?: number): Promise - getUpdates(initialTime: number, updatePeriod: number): Promise - findLastUpdate(initialTime: number, updatePeriod: number): Promise - getLastIndex(initialTime: number, updatePeriod: number): number +export interface SwarmStreamingFeedR extends SwarmFeedR { + getIndexForArbitraryTime(lookupTime: number): number + getLastIndex(): number + getUpdate(timeStamp?: number): Promise } /** Swarm Feed Read and Write operations */ export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { - setLastUpdate( - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime?: number, - ): Promise - setUpdate( - index: number, - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime?: number, - ): Promise -} - -export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedChunk { - const index = readUint64BigEndian(payload.slice(0, 8) as Bytes<8>) - const updatePeriod = readUint64BigEndian(payload.slice(8, 16) as Bytes<8>) - const timestamp = readUint64BigEndian(payload.slice(16, 24) as Bytes<8>) - const p = payload.slice(24) // 32 bytes - - if (p.length === 32 || p.length === 64) { - return { - timestamp, - updatePeriod, - index, - reference: p as ChunkReference, - } as any - } - - // TODO handle JSON-like metadata - throw new Error('NotImplemented: payload is longer than expected') -} - -export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { - const { reference, timestamp, updatePeriod, index } = extractDataFromSocPayload(socChunk.payload()) - - return { - ...socChunk, - index, - timestamp, - updatePeriod, - reference: reference, - } -} - -export function assembleSocPayload( - reference: ChunkReference, - options: { at: number; updatePeriod: number; index: number }, -): Uint8Array { - const at = options.at ?? Date.now() / 1000.0 - const timestamp = writeUint64BigEndian(at) - const updatePeriod = writeUint64BigEndian(options.updatePeriod) - const chunkIndex = writeUint64BigEndian(options.index) - - return serializeBytes(chunkIndex, updatePeriod, timestamp, reference) + setLastUpdate(postageBatchId: string | BatchId, reference: Reference): Promise + setUpdate(lookupTime: number, postageBatchId: string | BatchId, reference: Reference): Promise } diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index 2f2a63d..4293576 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -2,7 +2,7 @@ import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' import { getCurrentTime } from '../../src/utils' import { StreamingFeed } from '../../src/streaming-feed/index' import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' -import { beeUrl, getPostageBatch } from '../utils' +import { beeUrl, getPostageBatch, randomTopic, sleep } from '../utils' jest.setTimeout(360 * 1000) describe('streaming feed', () => { const testIdentity = { @@ -16,88 +16,61 @@ describe('streaming feed', () => { const signer = makePrivateKeySigner(hexToBytes(testIdentity.privateKey) as Bytes<32>) const bee = new Bee(beeUrl()) const batchId = getPostageBatch() - const streamingFeed = new StreamingFeed(bee) + const updatePeriod = 1000 + const streamingFeedFactory = (initialTime: number, updatePeriod: number) => { + return new StreamingFeed(bee, initialTime, updatePeriod) + } - test('lookup for empty feed update', () => { - const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic + test('lookup for empty feed update', async () => { + const streamingFeed = streamingFeedFactory(getCurrentTime(), updatePeriod) + const emptyTopic = randomTopic(0) const feedR = streamingFeed.makeFeedR(emptyTopic, testIdentity.address) - const lastIndex = feedR.getIndexForArbitraryTime(getCurrentTime()) - expect(lastIndex).toBe(-1) + await expect(feedR.findLastUpdate()).rejects.toThrow('There is no update found in the feed') }, 40000) test('setLastupdate then lookup', async () => { + const initialTime = getCurrentTime() + const streamingFeed = streamingFeedFactory(initialTime, updatePeriod) const feedRw = streamingFeed.makeFeedRW(topic, signer) - const initialTime = getCurrentTime() - const updatePeriod = 5000 const testReference: Reference = '0000000000000000000000000000000000000000000000000000000000000126' as HexString<64> - await feedRw.setLastUpdate(batchId, testReference, initialTime, updatePeriod) + await feedRw.setLastUpdate(batchId, testReference) - const feedUpdate = await feedRw.getUpdate(initialTime, updatePeriod) - const lastIndex = feedRw.getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) + const feedUpdate = await feedRw.getUpdate(initialTime) - expect(feedUpdate.index).toEqual(lastIndex) expect(bytesToHex(feedUpdate.owner())).toEqual(owner) - }, 21000) - - test('findLastUpdate should return last chunk', async () => { - const feedRw = streamingFeed.makeFeedRW(topic, signer) - - const initialTime = getCurrentTime() - const updatePeriod = 5000 - - const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) + const feedUpdate2 = await feedRw.findLastUpdate() + expect(feedUpdate.index).toEqual(feedUpdate2.index) expect(bytesToHex(feedUpdate.owner())).toEqual(owner) - }, 21000) - - test('getLastIndex should return last index', async () => { - const feedRw = streamingFeed.makeFeedRW(topic, signer) - - const initialTime = getCurrentTime() - const updatePeriod = 5000 - - const index = await feedRw.getLastIndex(initialTime, updatePeriod) - const feedUpdate = await feedRw.findLastUpdate(initialTime, updatePeriod) + const index = feedRw.getLastIndex() expect(feedUpdate.index).toEqual(index) }, 21000) test('multiple updates using setUpdate and lookup', async () => { + const streamingFeed = streamingFeedFactory(getCurrentTime(), updatePeriod) + const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) const referenceBytes = hexToBytes(reference) assertBytes(referenceBytes, 32) - const random = new Date().getTime().toString().padStart(64, '0') - const multipleUpdateTopic = Utils.Hex.makeHexString(random) as Topic + const multipleUpdateTopic = randomTopic(2) - const updatePeriod = 5000 const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) const numUpdates = 5 - const initialTime = getCurrentTime() - - const sleep = async (ms: number) => - new Promise(resolve => { - setTimeout(() => resolve(true), ms) - }) let lookupTime = getCurrentTime() - for (let i = 0; i < 0 + numUpdates; i++) { + for (let i = 0; i < numUpdates; i++) { const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> - - await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI), initialTime, updatePeriod, lookupTime) + await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI)) await sleep(updatePeriod) - await feedRw.getUpdate(initialTime, updatePeriod, lookupTime) + await feedRw.getUpdate(lookupTime) lookupTime = getCurrentTime() } - const feedUpdateResponse = await feedRw.getUpdates(initialTime, updatePeriod) + const feedUpdateResponse = await feedRw.getUpdates() expect(feedUpdateResponse.length).toEqual(numUpdates) - expect(feedUpdateResponse[0].updatePeriod).toEqual(updatePeriod) - expect(feedUpdateResponse[1].updatePeriod).toEqual(updatePeriod) - expect(feedUpdateResponse[2].updatePeriod).toEqual(updatePeriod) - expect(feedUpdateResponse[3].updatePeriod).toEqual(updatePeriod) - expect(feedUpdateResponse[4].updatePeriod).toEqual(updatePeriod) }, 45000) }) diff --git a/test/utils.ts b/test/utils.ts index 201a7f7..10d8ee2 100644 --- a/test/utils.ts +++ b/test/utils.ts @@ -1,4 +1,4 @@ -import { BatchId } from '@ethersphere/bee-js' +import { BatchId, Topic } from '@ethersphere/bee-js' /** * Returns a url for testing the Bee public API @@ -46,3 +46,13 @@ export function getPostageBatch(url = beeDebugUrl()): BatchId { return stamp } + +export function randomTopic(seed: number): Topic { + return (new Date().getTime() + seed).toString().padStart(64, '0') as Topic +} + +export async function sleep(ms: number): Promise { + return new Promise(resolve => { + setTimeout(resolve, ms) + }) +} From 9858cb2bd888093f7c7e3a9129bc118f975dc1ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 21 Aug 2023 18:04:31 +0200 Subject: [PATCH 17/21] docs: fix --- README.md | 56 +++++++++------------ src/sequential-feed.ts | 18 +++---- src/streaming.ts | 108 ----------------------------------------- 3 files changed, 32 insertions(+), 150 deletions(-) delete mode 100644 src/streaming.ts diff --git a/README.md b/README.md index 654dbcb..598d903 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ Creates a new Feed reader | Name | Type | Description | | ---- | ---- | ----------- | -| `topic` | `Topic | Uint8Array | string` | The feeds topic | -| `owner` | `EthAddress | Uint8Array | string` | Address of signer | +| `topic` | `Topic \| Uint8Array \| string` | The feeds topic | +| `owner` | `EthAddress \| Uint8Array \| string` | Address of signer | | `options` | `any ` | Options | ### Returns @@ -83,13 +83,13 @@ A FeedChunk object ## getUpdate -Gets a chunk update from a index +Gets the chunk update the given index ### Arguments | Name | Type | Description | | ---- | ---- | ----------- | -| `index` | `Index ` | Options | +| `index` | `Index` | Options | ### Returns @@ -110,7 +110,7 @@ Gets a set of chunks ### Returns -A FeedChunk object +A FeedChunk[] array ## Feed Writer @@ -192,6 +192,14 @@ A Reference object ## Streaming feeds +### Arguments + +| Name | Type | Description | +| ---- | ---- | ----------- | +| `bee` | `Bee` | Bee instance | +| `initialTime` | `number ` | initial time of streaming feed | +| `updatePeriod` | `number ` | feed update frequency | + ## Feed Reader ## makeFeedR @@ -222,7 +230,9 @@ const myIdentity = { address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, } const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic -const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633')) +const initialTime = Date.now() +const period = 5000 // ms +const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633'), initialTime, period) // Feed Reader const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic @@ -238,8 +248,6 @@ Gets an index from an arbitrary time | Name | Type | Description | | ---- | ---- | ----------- | | `lookupTime` | `number ` | Time position to lookup | -| `initialTime` | `number ` | feed chunk timestamp for index at 0 | -| `updatePeriod` | `number ` | feed update frequency | ### Returns @@ -254,8 +262,6 @@ Gets a chunk update from an arbitrary time, if lookup time is empty, it will ret | Name | Type | Description | | ---- | ---- | ----------- | -| `initialTime` | `number ` | feed chunk timestamp for index at 0 | -| `updatePeriod` | `number ` | feed update frequency | | `lookupTime` | `number ` | Time position to lookup (optional) | @@ -268,17 +274,10 @@ A StreamingFeedChunk object Gets a set of chunks from an arbitray time -### Arguments - -| Name | Type | Description | -| ---- | ---- | ----------- | -| `initialTime` | `number ` | feed chunk timestamp for index at 0 | -| `updatePeriod` | `number ` | feed update frequency | - ### Returns -A StreamingFeedChunk object +A StreamingFeedChunk array ## Feed Writer @@ -314,7 +313,9 @@ const myIdentity = { } const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic -const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633')) +const initialTime = Date.now() +const period = 5000 // ms +const streamingFeed = new StreamingFeed(new Bee('http://localhost:1633'), initialTime, period) // Feed Reader/Writer const feedRw = streamingFeed.makeFeedRW(topic, signer) @@ -330,11 +331,6 @@ Appends a new chunk to a feed, if the lookup time is empty, it will be added to | ---- | ---- | ----------- | | `postageBatchId` | `string | BatchId ` | postage batch id | | `reference` | `Reference` | reference | -| `initialTime` | `number ` | feed chunk timestamp for index at 0 | -| `updatePeriod` | `number ` | feed update frequency | -| `lookupTime` | `number ` | Time position to lookup (optional) | - - ### Returns @@ -350,15 +346,9 @@ Sets a chunk to a feed at an index number | Name | Type | Description | | ---- | ---- | ----------- | | `index` | `number` | index | -| `postageBatchId` | `string | BatchId ` | postage batch id | -| `reference` | `Reference` | reference | -| `initialTime` | `number ` | feed chunk timestamp for index at 0 | -| `updatePeriod` | `number ` | feed update frequency | -| `lookupTime` | `number ` | Time position to lookup (optional) | - - +| `postageBatchId` | `string \| BatchId` | postage batch id | +| `reference` | `Reference` | wrapped chunk reference | ### Returns -A Reference object - +A Reference of the Feed chunk diff --git a/src/sequential-feed.ts b/src/sequential-feed.ts index 7e2a682..38b6eed 100644 --- a/src/sequential-feed.ts +++ b/src/sequential-feed.ts @@ -71,7 +71,7 @@ export class SequentialFeed implements SwarmFeed { /** * Downloads a chunk by index number - * @param index index number + * @param index index number of the feed chunk * @returns A feed chunk */ const getUpdate = async (index: number): Promise => { @@ -81,9 +81,9 @@ export class SequentialFeed implements SwarmFeed { } /** - * Download all chunk by indices - * @param indices an array of index numbers - * @returns An array of chunks + * Download all chunks by indices + * @param indices an array of indices + * @returns An array of feed chunks */ const getUpdates = async (indices: number[]): Promise => { const promises: Promise[] = [] @@ -126,8 +126,8 @@ export class SequentialFeed implements SwarmFeed { * Sets the upload chunk to update * @param index the chunk index to update * @param postageBatchId swarm postage batch id - * @param reference chunk reference - * @returns a chunk reference + * @param reference wrapped chunk reference + * @returns a SOC reference */ const setUpdate = async ( index: number, @@ -146,8 +146,8 @@ export class SequentialFeed implements SwarmFeed { /** * Sets the next upload chunk * @param postageBatchId swarm postage batch id - * @param reference chunk reference - * @returns a chunk reference + * @param reference wrapped chunk reference + * @returns a SOC reference */ const setLastUpdate = async (postageBatchId: string | BatchId, reference: Reference): Promise => { let index: number @@ -172,7 +172,7 @@ export class SequentialFeed implements SwarmFeed { * Get Single Owner Chunk identifier * @param topic a swarm topic, bytes 32 length * @param index the chunk index - * @returns a bytes 32 + * @returns 32 bytes */ public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { const indexBytes = writeUint64BigEndian(index) diff --git a/src/streaming.ts b/src/streaming.ts deleted file mode 100644 index cf9594c..0000000 --- a/src/streaming.ts +++ /dev/null @@ -1,108 +0,0 @@ -import { BatchId, Bee, Reference, Signer, Topic } from '@ethersphere/bee-js' -import type { SingleOwnerChunk } from '@ethersphere/bee-js/dist/src/chunk/soc' -import type { ChunkReference } from '@ethersphere/bee-js/dist/src/feed' -import type { EthAddress } from '@ethersphere/bee-js/dist/src/utils/eth' -import { SwarmFeedHandler } from './feed' -import { Bytes, readUint64BigEndian, serializeBytes, writeUint64BigEndian } from './utils' - -export interface StreamingFeedChunk extends SingleOwnerChunk { - index: number - reference: ChunkReference - timestamp: number - updatePeriod: number -} - -export type FaultTolerantStreamType = 'fault-tolerant-stream' - -/** Interface for feed type classes */ -export interface IStreamingFeed { - /** Feed type identifier */ - readonly type: FaultTolerantStreamType - /** initialised BeeJS instance */ - readonly bee: Bee - /** get Feed interface with read operations */ - makeFeedR( - topic: Topic | Uint8Array | string, - owner: EthAddress | Uint8Array | string, - ...options: any[] - ): SwarmStreamingFeedR - /** get Feed interface with write and read operations */ - makeFeedRW( - topic: Topic | Uint8Array | string, - signer: Signer | Uint8Array | string, - options?: any, - ): SwarmStreamingFeedRW - /** Get Single Owner Chunk identifier */ - getIdentifier(topic: Bytes<32>, index: Index): Bytes<32> -} - -/** Swarm Feed Read operations */ -export interface SwarmStreamingFeedR extends SwarmFeedHandler { - getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): number - getUpdate(initialTime: number, updatePeriod: number, lookupTime?: number): Promise - getUpdates(initialTime: number, updatePeriod: number): Promise - findLastUpdate(initialTime: number, updatePeriod: number): Promise - getLastIndex(initialTime: number, updatePeriod: number): Promise -} - -/** Swarm Feed Read and Write operations */ -export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR { - setLastUpdate( - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime?: number, - ): Promise - setUpdate( - index: number, - postageBatchId: string | BatchId, - reference: Reference, - initialTime: number, - updatePeriod: number, - lookupTime?: number, - ): Promise -} - -export function extractDataFromSocPayload(payload: Uint8Array): StreamingFeedChunk { - const index = readUint64BigEndian(payload.slice(0, 8) as Bytes<8>) - const updatePeriod = readUint64BigEndian(payload.slice(8, 16) as Bytes<8>) - const timestamp = readUint64BigEndian(payload.slice(16, 24) as Bytes<8>) - const p = payload.slice(24) // 32 bytes - - if (p.length === 32 || p.length === 64) { - return { - timestamp, - updatePeriod, - index, - reference: p as ChunkReference, - } as any - } - - // TODO handle JSON-like metadata - throw new Error('NotImplemented: payload is longer than expected') -} - -export function mapSocToFeed(socChunk: SingleOwnerChunk): StreamingFeedChunk { - const { reference, timestamp, updatePeriod, index } = extractDataFromSocPayload(socChunk.payload()) - - return { - ...socChunk, - index, - timestamp, - updatePeriod, - reference: reference, - } -} - -export function assembleSocPayload( - reference: ChunkReference, - options: { at: number; updatePeriod: number; index: number }, -): Uint8Array { - const at = options.at ?? Date.now() / 1000.0 - const timestamp = writeUint64BigEndian(at) - const updatePeriod = writeUint64BigEndian(options.updatePeriod) - const chunkIndex = writeUint64BigEndian(options.index) - - return serializeBytes(chunkIndex, updatePeriod, timestamp, reference) -} From 53f7a2e9a0ab1f11a55d2abc6431b6a60272a281 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 21 Aug 2023 18:06:03 +0200 Subject: [PATCH 18/21] ci: remove node 14 since it is not supported anymore --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index f29eff1..c1d9f30 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -23,7 +23,7 @@ jobs: strategy: matrix: - node-version: [14.x, 16.x, 18.x] + node-version: [16.x, 18.x] steps: - name: Checkout From e4a3a5325741dc9239e0859d1a2533ef49b50903 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 21 Aug 2023 18:12:31 +0200 Subject: [PATCH 19/21] docs: bump node version in the badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 598d903..a156fb7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ [![Tests](https://github.com/fairDataSociety/swarm-feeds/actions/workflows/test.yml/badge.svg)](https://github.com/fairDataSociety/swarm-feeds/actions/workflows/test.yml) -![](https://img.shields.io/badge/Node.js-%3E%3D12.0.0-orange.svg?style=flat-square) +![](https://img.shields.io/badge/Node.js-%3E%3D16.0.0-orange.svg?style=flat-square) ![](https://img.shields.io/badge/runs%20in-browser%20%7C%20node%20%7C%20webworker%20%7C%20electron-orange) # Swarm Feeds From 4500ee50688ef41c6131c76429dbb35e584e5338 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 21 Aug 2023 18:26:15 +0200 Subject: [PATCH 20/21] test: added reference check --- src/streaming-feed/index.ts | 2 +- test/integration/streaming-feed.spec.ts | 29 +++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/streaming-feed/index.ts b/src/streaming-feed/index.ts index 7a58f86..b14882d 100644 --- a/src/streaming-feed/index.ts +++ b/src/streaming-feed/index.ts @@ -53,7 +53,7 @@ export class StreamingFeed implements SwarmFeed { * Download Feed Chunk at Specific Time * @param lookupTime lookup time * @param discover indicates whether the algorithm will look for the closest successful hit - * @returns a StreamingFeedChunk object + * @returns a FeedChunk object */ const getUpdate = async (lookupTime?: number, discover = true): Promise => { lookupTime = lookupTime ?? getCurrentTime() diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index 4293576..664f326 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -73,4 +73,33 @@ describe('streaming feed', () => { const feedUpdateResponse = await feedRw.getUpdates() expect(feedUpdateResponse.length).toEqual(numUpdates) }, 45000) + + test('multiple updates using setUpdate and lookup', async () => { + const streamingFeed = streamingFeedFactory(getCurrentTime(), updatePeriod) + + const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) + const referenceBytes = hexToBytes(reference) + assertBytes(referenceBytes, 32) + const multipleUpdateTopic = randomTopic(2) + + const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) + + const numUpdates = 5 + + const references: Bytes<32>[] = [] + let lookupTime = getCurrentTime() + for (let i = 0; i < numUpdates; i++) { + const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> + references.push(referenceI) + + await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI)) + await sleep(updatePeriod) + await feedRw.getUpdate(lookupTime) + lookupTime = getCurrentTime() + } + + const feedUpdateResponse = await feedRw.getUpdates() + expect(feedUpdateResponse.map(e => e.reference)).toStrictEqual(references.map(e => bytesToHex(e)).reverse()) + expect(feedUpdateResponse.length).toEqual(numUpdates) + }, 45000) }) From 4180f4b96499f8c0bfc814256c1fec9b16e9d75b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20Levente=20T=C3=B3th?= Date: Mon, 21 Aug 2023 20:02:49 +0200 Subject: [PATCH 21/21] test: add irregular update tests --- test/integration/streaming-feed.spec.ts | 39 +++++++++++-------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/test/integration/streaming-feed.spec.ts b/test/integration/streaming-feed.spec.ts index 664f326..af60d5f 100644 --- a/test/integration/streaming-feed.spec.ts +++ b/test/integration/streaming-feed.spec.ts @@ -1,7 +1,7 @@ import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' import { getCurrentTime } from '../../src/utils' import { StreamingFeed } from '../../src/streaming-feed/index' -import { assertBytes, Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' +import { Bytes, bytesToHex, HexString, hexToBytes, makePrivateKeySigner } from '../../src/utils' import { beeUrl, getPostageBatch, randomTopic, sleep } from '../utils' jest.setTimeout(360 * 1000) describe('streaming feed', () => { @@ -52,54 +52,49 @@ describe('streaming feed', () => { test('multiple updates using setUpdate and lookup', async () => { const streamingFeed = streamingFeedFactory(getCurrentTime(), updatePeriod) - const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) - const referenceBytes = hexToBytes(reference) - assertBytes(referenceBytes, 32) + const referenceBytes = new Uint8Array(32) const multipleUpdateTopic = randomTopic(2) const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) const numUpdates = 5 - let lookupTime = getCurrentTime() for (let i = 0; i < numUpdates; i++) { const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI)) await sleep(updatePeriod) - await feedRw.getUpdate(lookupTime) - lookupTime = getCurrentTime() } const feedUpdateResponse = await feedRw.getUpdates() expect(feedUpdateResponse.length).toEqual(numUpdates) }, 45000) - test('multiple updates using setUpdate and lookup', async () => { + test('multiple irregular updates', async () => { const streamingFeed = streamingFeedFactory(getCurrentTime(), updatePeriod) - const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) - const referenceBytes = hexToBytes(reference) - assertBytes(referenceBytes, 32) - const multipleUpdateTopic = randomTopic(2) + const referenceBytes = new Uint8Array(32) + const multipleUpdateTopic = randomTopic(3) const feedRw = streamingFeed.makeFeedRW(multipleUpdateTopic, signer) - const numUpdates = 5 + const numUpdates = 2 const references: Bytes<32>[] = [] - let lookupTime = getCurrentTime() - for (let i = 0; i < numUpdates; i++) { - const referenceI = new Uint8Array([i, ...referenceBytes.slice(1)]) as Bytes<32> - references.push(referenceI) - await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(referenceI)) - await sleep(updatePeriod) - await feedRw.getUpdate(lookupTime) - lookupTime = getCurrentTime() - } + let reference = new Uint8Array([0, ...referenceBytes.slice(1)]) as Bytes<32> + await sleep(2 * updatePeriod) + references.push(reference) + await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(reference)) + await sleep(2 * updatePeriod) + + reference = new Uint8Array([1, ...referenceBytes.slice(1)]) as Bytes<32> + references.push(reference) + await feedRw.setLastUpdate(batchId, Utils.Hex.bytesToHex(reference)) const feedUpdateResponse = await feedRw.getUpdates() expect(feedUpdateResponse.map(e => e.reference)).toStrictEqual(references.map(e => bytesToHex(e)).reverse()) expect(feedUpdateResponse.length).toEqual(numUpdates) + expect(feedUpdateResponse[1].index).toBe(2) // first update; first 2 indices sleeptime + expect(feedUpdateResponse[0].index).toBe(4) // latest update; first 2 indices sleeptime then normal update and 2 sleep times again }, 45000) })