-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: streaming feed #4
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a missing method in the implementation setUpdate
and also presumably a wrong implementation of getUpdates
. Please correct these and also refactor completely the docs written based on my comments.
README.md
Outdated
import { Bee, Reference, Topic, Utils } from '@ethersphere/bee-js' | ||
import type { HexString } from '@ethersphere/bee-js/dist/src/utils/hex' | ||
import { SequentialFeed } from '../../src/sequential-feed' | ||
import { assertBytes, Bytes, bytesToHex, hexToBytes, makePrivateKeySigner } from '../../src/utils' | ||
import { beeUrl, getPostageBatch } from '../utils' | ||
|
||
const myIdentity = { | ||
privateKey: '...private key as hex...' as HexString, | ||
publicKey: '03c32bb011339667a487b6c1c35061f15f7edc36aa9a0f8648aba07a4b8bd741b4' as HexString, | ||
address: '8d3766440f0d7b949a5e32995d09619a7f86e632' as HexString, | ||
} | ||
const owner = Utils.Hex.makeHexString(myIdentity.address, 40) | ||
const signer = makePrivateKeySigner(hexToBytes(myIdentity.privateKey) as Bytes<32>) | ||
const topic = '0000000000000000000000000000000000000000000000000000000000000000' as Topic | ||
const bee = new Bee(beeUrl()) | ||
const batchId = getPostageBatch() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this sample code with relative imports is wrong as you cannot run the script anywhere.
a sample code should work in a clean starting environment and/or that has to be simple (few lines of clean code).
This sample code does not satisfy either one above.
src/streaming.ts
Outdated
writeUint64BigEndian, | ||
} from './utils' | ||
|
||
export type StreamingFeedData = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is basically StreamingFeedChunk
but index
is chunkIndex
.
Delete it and use `StreamingFeedChunk only
src/streaming.ts
Outdated
} | ||
|
||
/** Swarm Feed Read and Write operations */ | ||
export interface SwarmStreamingFeedRW<Index = number> extends SwarmStreamingFeedR { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you don't use your declared type variable.
export interface SwarmStreamingFeedRW<Index = number> extends SwarmStreamingFeedR { | |
export interface SwarmStreamingFeedRW extends SwarmStreamingFeedR<number> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, setUpdate
is missing.
src/streaming.ts
Outdated
export interface SwarmStreamingFeedR<Index = number> extends SwarmFeedHandler { | ||
getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): Promise<Index> | Index | ||
getUpdate(initialTime: number, updatePeriod: number, lookupTime?: Index): Promise<StreamingFeedChunk<Index>> | ||
getUpdates(initialTime: number, updatePeriod: number): Promise<StreamingFeedChunk<Index>[]> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to pass a time interval for fetching e.g. lookupTimeInterval: [number, number]
src/streaming.ts
Outdated
|
||
/** Swarm Feed Read operations */ | ||
export interface SwarmStreamingFeedR<Index = number> extends SwarmFeedHandler { | ||
getIndexForArbitraryTime(lookupTime: number, initialTime?: number, updatePeriod?: number): Promise<Index> | Index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it shouldn't give back Promise
as it can be calculated completely client-side
src/streaming-feed.ts
Outdated
const index = await getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) | ||
const socChunk = await socReader.download(this.getIdentifier(topicBytes, index - 1)) | ||
|
||
let feed = await mapSocToFeed(socChunk) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not necessary await
src/streaming-feed.ts
Outdated
const socChunk = await socReader.download(this.getIdentifier(topicBytes, index - 1)) | ||
|
||
let feed = await mapSocToFeed(socChunk) | ||
let i = 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the feed indexing starts at 0 as far as I know.
src/streaming-feed.ts
Outdated
// while from last to first, use lookupTime = chunk.timestamp + 1 | ||
|
||
const index = await getIndexForArbitraryTime(getCurrentTime(), initialTime, updatePeriod) | ||
const socChunk = await socReader.download(this.getIdentifier(topicBytes, index - 1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why you download the penultimate chunk instead of the last one?
src/streaming-feed.ts
Outdated
let i = 1 | ||
feeds.push(feed) | ||
while (i < index) { | ||
feed = await getUpdate(initialTime, updatePeriod, feed.timestamp + updatePeriod + 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cannot comprehend with this section here.
you downloaded the penultimate chunk from which you go forward in updates that you repeat times based on the index of the penultimate chunk...
can you elaborate please? maybe I'm missing something.
src/streaming-feed.ts
Outdated
} | ||
|
||
/** Get Single Owner Chunk identifier */ | ||
public getIdentifier(topic: Utils.Bytes.Bytes<32>, index: number): Utils.Bytes.Bytes<32> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please provide detailed JSDoc here, since index: number
is disambiguous because timestamp is also number as well as its transformed sequential index.
Is this done, I guess it is ready for review @molekilla ? |
src/sequential-feed.ts
Outdated
/** | ||
* Creates a sequential feed reader | ||
* @param topic a swarm topic | ||
* @param signer signer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no such a parameter.
/** | ||
* Gets the last index in the feed | ||
* @returns An index number | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does have any sense to define JSDoc above this, and the other the arrow functions. if you want to keep these then define these about the corresponding interface definitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix the sequential feed, remember it was started by you :)
src/streaming.ts
Outdated
} | ||
|
||
/** Interface for feed type classes */ | ||
export interface SwarmStreamingFeed<Index> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please, rename it to IStreamingFeed
src/streaming.ts
Outdated
/** Interface for feed type classes */ | ||
export interface SwarmStreamingFeed<Index> { | ||
/** Feed type identifier */ | ||
readonly type: FeedType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
feed type here is only fault-tolerant-stream
src/streaming.ts
Outdated
} | ||
|
||
/** Swarm Feed Read operations */ | ||
export interface SwarmStreamingFeedR extends SwarmFeedHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the already defined SwarmFeedR
why was not good enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findLastUpdate
and getLastIndex
is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SwarmStreamingFeedR
I will leave it, streaming uses the index in every method call and before doing that changed I circled back with you about it. Let me know.
Will add the other two
src/streaming.ts
Outdated
const updatePeriod = writeUint64BigEndian(options.updatePeriod) | ||
const chunkIndex = writeUint64BigEndian(options.index) | ||
|
||
return serializeBytes(chunkIndex, updatePeriod, timestamp, reference) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest that you should check here how long the reference
is and encode it to the soc payload.
@@ -39,10 +39,12 @@ describe('feed', () => { | |||
}, 21000) | |||
|
|||
test('multiple updates using setUpdate and lookup', async () => { | |||
const reference = Utils.Hex.makeHexString('0000000000000000000000000000000000000000000000000000000000000000', 64) | |||
const reference = Utils.Hex.makeHexString(new Date().getTime().toString().padStart(64, '0'), 64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you touch this part?
- it is not related to the streaming of feeds
- it messes up the inteded test logic as you check the code below.
test('lookup for empty feed update', async () => { | ||
const emptyTopic = '1200000000000000000000000000000000000000000000000000000000000001' as Topic | ||
const feedR = streamingFeed.makeFeedR(emptyTopic, testIdentity.address) | ||
const lastIndex = await feedR.getIndexForArbitraryTime(getCurrentTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecesarry await
const initialTime = getCurrentTime() | ||
|
||
const sleep = async (seconds: number) => | ||
new Promise((resolve, reject) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not used reject
variable.
src/streaming-feed.ts
Outdated
* @param lookupTime lookup time | ||
* @returns Returns -1 if not found, otherwise the index | ||
*/ | ||
const getIndexForArbitraryTime = (lookupTime: number, initialTime: number, updatePeriod: number): number => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move it to a class method or normal helper function.
#3
Pending docs