Skip to content

Commit

Permalink
Merge pull request #62 from usherlabs/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
rsoury authored Jul 6, 2024
2 parents 7e59149 + 8cea7c8 commit a74ab98
Show file tree
Hide file tree
Showing 15 changed files with 8,755 additions and 6,678 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"author": "Ryan Soury <[email protected]>",
"license": "GPL-3.0",
"scripts": {
"lsn": "node ./packages/core/dist/bin/logstore-broker.js",
"build": "npx turbo run build --force",
"clean": "del packages/*/node_modules packages/*/dist node_modules",
"format": "prettier --write \"**/*.{js,jsx,mjs,ts,tsx,json,css,scss,md,sol}\""
Expand Down
19 changes: 10 additions & 9 deletions packages/core/src/plugins/logStore/LogStore.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { StreamMessage } from '@streamr/protocol';
import { StreamMessage, toStreamID } from '@streamr/protocol';
import { Logger, MetricsContext, RateMetric } from '@streamr/utils';
import { Readable } from 'stream';

Expand Down Expand Up @@ -58,10 +58,11 @@ export class LogStore extends DatabaseEventEmitter {
partition: number,
requestCount: number
): Readable {
const verifiedStreamId = toStreamID(streamId);
if (requestCount < 0) {
return this.db.queryFirst(streamId, partition, -requestCount);
return this.db.queryFirst(verifiedStreamId, partition, -requestCount);
} else {
return this.db.queryLast(streamId, partition, requestCount);
return this.db.queryLast(verifiedStreamId, partition, requestCount);
}
}

Expand All @@ -84,7 +85,7 @@ export class LogStore extends DatabaseEventEmitter {

return this.db
.queryRange(
streamId,
toStreamID(streamId),
partition,
fromTimestamp,
fromSequenceNo,
Expand Down Expand Up @@ -130,7 +131,7 @@ export class LogStore extends DatabaseEventEmitter {
}
return this.db
.queryRange(
streamId,
toStreamID(streamId),
partition,
fromTimestamp,
fromSequenceNo,
Expand Down Expand Up @@ -166,7 +167,7 @@ export class LogStore extends DatabaseEventEmitter {
partition: number
): Promise<number> {
const firstMessageDateInStream = await this.db.getFirstMessageDateInStream(
streamId,
toStreamID(streamId),
partition
);
return firstMessageDateInStream
Expand All @@ -179,7 +180,7 @@ export class LogStore extends DatabaseEventEmitter {
partition: number
): Promise<number> {
const lastMessageDateInStream = await this.db.getLastMessageDateInStream(
streamId,
toStreamID(streamId),
partition
);
return lastMessageDateInStream
Expand All @@ -191,14 +192,14 @@ export class LogStore extends DatabaseEventEmitter {
streamId: string,
partition: number
): Promise<number> {
return this.db.getNumberOfMessagesInStream(streamId, partition);
return this.db.getNumberOfMessagesInStream(toStreamID(streamId), partition);
}

async getTotalBytesInStream(
streamId: string,
partition: number
): Promise<number> {
return this.db.getTotalBytesInStream(streamId, partition);
return this.db.getTotalBytesInStream(toStreamID(streamId), partition);
}

public async close(): Promise<void> {
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/plugins/logStore/NodeStreamsRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import StreamrClient, { Stream } from '@streamr/sdk';
import { ObservableEventEmitter } from '@streamr/utils';
import { Logger, ObservableEventEmitter } from '@streamr/utils';

const logger = new Logger(module);

export class NodeStreamsRegistry extends ObservableEventEmitter<{
registerStream: (stream: Stream) => void;
Expand All @@ -22,6 +24,8 @@ export class NodeStreamsRegistry extends ObservableEventEmitter<{
const stream = await this.streamrClient.getStream(streamId);
this.registeredStreams.set(streamId, stream);

logger.info('Registered stream to be tracked', { streamId });

this.emit('registerStream', stream);
}

Expand All @@ -32,6 +36,8 @@ export class NodeStreamsRegistry extends ObservableEventEmitter<{
}
this.registeredStreams.delete(streamId);

logger.info('Unregistered stream from being tracked', { streamId });

this.emit('unregisterStream', stream);
}

Expand Down
19 changes: 11 additions & 8 deletions packages/core/src/plugins/logStore/database/DatabaseAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MessageRef, StreamMessage } from '@streamr/protocol';
import { Logger, ObservableEventEmitter } from '@streamr/utils';
import { Readable } from 'stream';
import { StreamID } from '@streamr/sdk';

const logger = new Logger(module);

Expand Down Expand Up @@ -33,8 +34,10 @@ export abstract class DatabaseAdapter extends DatabaseEventEmitter {
super();
}

// Note: important to use `StreamID` for streamIds, because it checks the correct casing and format

abstract queryRange(
streamId: string,
streamId: StreamID,
partition: number,
fromTimestamp: number,
fromSequenceNo: number,
Expand All @@ -46,40 +49,40 @@ export abstract class DatabaseAdapter extends DatabaseEventEmitter {
): Readable;

abstract queryByMessageRefs(
streamId: string,
streamId: StreamID,
partition: number,
messageRefs: MessageRef[]
): Readable;

abstract queryFirst(
streamId: string,
streamId: StreamID,
partition: number,
requestCount: number
): Readable;

abstract queryLast(
streamId: string,
streamId: StreamID,
partition: number,
requestCount: number
): Readable;

abstract getFirstMessageDateInStream(
streamId: string,
streamId: StreamID,
partition: number
): Promise<number | null>;

abstract getLastMessageDateInStream(
streamId: string,
streamId: StreamID,
partition: number
): Promise<number | null>;

abstract getNumberOfMessagesInStream(
streamId: string,
streamId: StreamID,
partition: number
): Promise<number>;

abstract getTotalBytesInStream(
streamId: string,
streamId: StreamID,
partition: number
): Promise<number>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ export class SQLiteDBAdapter extends DatabaseAdapter {
payload: payload,
content_bytes: payload.length,
};
await this.dbClient.insert(streamDataTable).values(record);
await this.dbClient.insert(streamDataTable).values(record).onConflictDoNothing();

this.emit('write', record.payload);

Expand Down
10 changes: 9 additions & 1 deletion packages/core/src/plugins/logStore/http/dataQueryEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Endpoints for RESTful data requests
*/
import {
Logger,
MetricsContext,
MetricsDefinition,
RateMetric,
Expand All @@ -20,6 +21,7 @@ import { sendError, sendSuccess } from './httpHelpers';
import { injectLogstoreContextMiddleware } from './injectLogstoreContextMiddleware';
import { FromRequest, LastRequest, RangeRequest } from './requestTypes';

const logger = new Logger(module);

// TODO: move this to protocol-js
export const MIN_SEQUENCE_NUMBER_VALUE = 0;
Expand Down Expand Up @@ -100,6 +102,10 @@ const getDataForRequest = async (

const createHandler = (metrics: MetricsDefinition): RequestHandler => {
return async (req: Request, res: Response) => {
logger.debug('Received HTTP data query request', {
query: req.query,
params: req.params,
});
if (Number.isNaN(parseInt(req.params.partition))) {
sendError(
`Path parameter "partition" not a number: ${req.params.partition}`,
Expand Down Expand Up @@ -167,7 +173,9 @@ export const createDataQueryEndpoint = (
};
metricsContext.addMetrics('broker.plugin.logstore', metrics);
return {
path: `/stores/:id/data/partitions/:partition/:queryType`,
// permit usage of slashes in paths
// \S = non-whitespace character, we use it because `.` doesn't work well with express in this context
path: `/stores/:id(\\S+\?)/data/partitions/:partition/:queryType`,
method: 'get',
requestHandlers: [
// We need to inject it here, because the execution context from
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/plugins/logStore/http/readyEndpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ export const createReadyEndpoint = (): HttpServerEndpoint => {
const ctx = logStoreContext.getStore();

return {
path: `/stores/:id/partitions/:partition/ready`,
path: `/stores/:id(\\S+\?)/partitions/:partition/ready`,
method: 'get',
requestHandlers: [
// We need to inject it here, because the execution context from
Expand Down
12 changes: 7 additions & 5 deletions packages/core/src/plugins/logStore/network/Aggregator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
QueryResponse,
QueryType,
} from '@logsn/protocol';
import { MessageRef } from '@streamr/protocol';
import { MessageRef, toStreamID } from '@streamr/protocol';
import { convertBytesToStreamMessage } from '@streamr/trackerless-network';
import { EthereumAddress, Logger } from '@streamr/utils';
import { PassThrough, pipeline, Readable } from 'stream';
Expand Down Expand Up @@ -70,18 +70,20 @@ export class Aggregator extends PassThrough {
this.aggregationList = new AggregationList();
this.queryStreams = new Set<Readable>();

const streamId = toStreamID(this.queryRequest.streamId);

let queryStream: Readable;
switch (this.queryRequest.queryOptions.queryType) {
case QueryType.Last:
queryStream = this.database.queryLast(
this.queryRequest.streamId,
streamId,
this.queryRequest.partition,
this.queryRequest.queryOptions.last
);
break;
case QueryType.From:
queryStream = this.database.queryRange(
this.queryRequest.streamId,
streamId,
this.queryRequest.partition,
this.queryRequest.queryOptions.from.timestamp,
this.queryRequest.queryOptions.from.sequenceNumber ??
Expand All @@ -94,7 +96,7 @@ export class Aggregator extends PassThrough {
break;
case QueryType.Range:
queryStream = this.database.queryRange(
this.queryRequest.streamId,
streamId,
this.queryRequest.partition,
this.queryRequest.queryOptions.from.timestamp,
this.queryRequest.queryOptions.from.sequenceNumber ??
Expand Down Expand Up @@ -235,7 +237,7 @@ export class Aggregator extends PassThrough {

// TODO: Handle an error if the pipe gets broken
const queryStream = this.database.queryRange(
this.queryRequest.streamId,
toStreamID(this.queryRequest.streamId),
this.queryRequest.partition,
readyFrom.timestamp,
readyFrom.sequenceNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,17 @@ export class LogStoreNetworkPlugin extends LogStorePlugin {
{
onStreamPartAdded: async (streamPart) => {
try {
await node.join(streamPart, { minCount: 1, timeout: 5000 }); // best-effort, can time out
await node
.join(streamPart, { minCount: 1, timeout: 5000 })
.catch(() => {}); // best-effort, can time out. No-op on error
await this.nodeStreamsRegistry.registerStreamId(
StreamPartIDUtils.getStreamID(streamPart)
);
} catch (_e) {
// no-op
} catch (e) {
logger.error('error after joining stream', {
error: e,
streamPart,
});
}
try {
// TODO: Temporary disabled sending of assignment messages through the system stream.
Expand Down
11 changes: 7 additions & 4 deletions packages/core/src/plugins/logStore/network/Propagator.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { QueryRequest, QueryResponse, QueryType } from '@logsn/protocol';
import { toStreamID } from '@streamr/protocol';
import { convertBytesToStreamMessage } from '@streamr/trackerless-network';
import { Logger } from '@streamr/utils';
import { PassThrough, pipeline, Readable } from 'stream';
Expand Down Expand Up @@ -50,18 +51,20 @@ export class Propagator {
this.propagationStream = new PassThrough({ objectMode: true });
this.queryStreams = new Set<Readable>();

const streamId = toStreamID(this.queryRequest.streamId);

let queryStream: Readable;
switch (this.queryRequest.queryOptions.queryType) {
case QueryType.Last:
queryStream = this.database.queryLast(
this.queryRequest.streamId,
streamId,
this.queryRequest.partition,
this.queryRequest.queryOptions.last
);
break;
case QueryType.From:
queryStream = this.database.queryRange(
this.queryRequest.streamId,
streamId,
this.queryRequest.partition,
this.queryRequest.queryOptions.from.timestamp,
this.queryRequest.queryOptions.from.sequenceNumber ??
Expand All @@ -74,7 +77,7 @@ export class Propagator {
break;
case QueryType.Range:
queryStream = this.database.queryRange(
this.queryRequest.streamId,
streamId,
this.queryRequest.partition,
this.queryRequest.queryOptions.from.timestamp,
this.queryRequest.queryOptions.from.sequenceNumber ??
Expand Down Expand Up @@ -150,7 +153,7 @@ export class Propagator {

if (messageRefs.length) {
const queryStream = this.database.queryByMessageRefs(
this.queryRequest.streamId,
toStreamID(this.queryRequest.streamId),
this.queryRequest.partition,
messageRefs
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ import {
toStreamID,
toStreamPartID,
} from '@streamr/protocol';
import { executeSafePromise } from '@streamr/utils';
import { executeSafePromise, Logger } from '@streamr/utils';
import _ from 'lodash';

import { PluginOptions, StandaloneModeConfig } from '../../../Plugin';
import { BaseQueryRequestManager } from '../BaseQueryRequestManager';
import { LogStorePlugin } from '../LogStorePlugin';
import { LogStoreStandaloneConfig } from './LogStoreStandaloneConfig';

const logger = new Logger(module);

export class LogStoreStandalonePlugin extends LogStorePlugin {
private standaloneQueryRequestManager: BaseQueryRequestManager;

Expand Down Expand Up @@ -65,12 +67,17 @@ export class LogStoreStandalonePlugin extends LogStorePlugin {
const logStoreConfig = new LogStoreStandaloneConfig(streamPartIds, {
onStreamPartAdded: async (streamPart) => {
try {
await node.join(streamPart, { minCount: 1, timeout: 5000 }); // best-effort, can time out
await node
.join(streamPart, { minCount: 1, timeout: 5000 })
.catch(() => {}); // best-effort, can time out. No-op on error
await this.nodeStreamsRegistry.registerStreamId(
StreamPartIDUtils.getStreamID(streamPart)
);
} catch (_e) {
// no-op
} catch (e) {
logger.error('error after joining stream', {
error: e,
streamPart,
});
}
},
onStreamPartRemoved: async (streamPart) => {
Expand Down
Loading

0 comments on commit a74ab98

Please sign in to comment.