Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore!: update proto definitions #1196

Merged
merged 9 commits into from
Feb 27, 2023
20 changes: 10 additions & 10 deletions packages/core/src/lib/filter/filter_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@ export type ContentFilter = {
/**
* FilterRPC represents a message conforming to the Waku Filter protocol
*/
export class FilterRPC {
public constructor(public proto: proto.FilterRPC) {}
export class FilterRpc {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you change capitalisation here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capitalization was changed on proto file to proper PascalCase, I just match it as PascalCase is what we use across the codebase.

public constructor(public proto: proto.FilterRpc) {}

static createRequest(
topic: string,
contentFilters: ContentFilter[],
requestId?: string,
subscribe = true
): FilterRPC {
return new FilterRPC({
): FilterRpc {
return new FilterRpc({
requestId: requestId || uuid(),
request: {
subscribe,
Expand All @@ -31,26 +31,26 @@ export class FilterRPC {
/**
*
* @param bytes Uint8Array of bytes from a FilterRPC message
* @returns FilterRPC
* @returns FilterRpc
*/
static decode(bytes: Uint8Array): FilterRPC {
const res = proto.FilterRPC.decode(bytes);
return new FilterRPC(res);
static decode(bytes: Uint8Array): FilterRpc {
const res = proto.FilterRpc.decode(bytes);
return new FilterRpc(res);
}

/**
* Encode the current FilterRPC request to bytes
* @returns Uint8Array
*/
encode(): Uint8Array {
return proto.FilterRPC.encode(this.proto);
return proto.FilterRpc.encode(this.proto);
}

get push(): proto.MessagePush | undefined {
return this.proto.push;
}

get requestId(): string | undefined {
get requestId(): string {
return this.proto.requestId;
}
}
12 changes: 4 additions & 8 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { toProtoMessage } from "../to_proto_message.js";

import { ContentFilter, FilterRPC } from "./filter_rpc.js";
import { ContentFilter, FilterRpc } from "./filter_rpc.js";

export { ContentFilter };

Expand Down Expand Up @@ -75,18 +75,14 @@ class Filter extends BaseProtocol implements IFilter {
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
}));
const request = FilterRPC.createRequest(
const request = FilterRpc.createRequest(
pubSubTopic,
contentFilters,
undefined,
true
);

const requestId = request.requestId;
if (!requestId)
throw new Error(
"Internal error: createRequest expected to set `requestId`"
);

const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
Expand Down Expand Up @@ -128,7 +124,7 @@ class Filter extends BaseProtocol implements IFilter {
try {
pipe(streamData.stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const res = FilterRPC.decode(bytes.slice());
const res = FilterRpc.decode(bytes.slice());
if (res.requestId && res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
Expand Down Expand Up @@ -228,7 +224,7 @@ class Filter extends BaseProtocol implements IFilter {
requestId: string,
peer: Peer
): Promise<void> {
const unsubscribeRequest = FilterRPC.createRequest(
const unsubscribeRequest = FilterRpc.createRequest(
topic,
contentFilters,
requestId,
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";

import { PushRPC } from "./push_rpc.js";
import { PushRpc } from "./push_rpc.js";

const log = debug("waku:light-push");

Expand Down Expand Up @@ -54,7 +54,7 @@ class LightPush extends BaseProtocol implements ILightPush {
log("Failed to encode to protoMessage, aborting push");
return { recipients };
}
const query = PushRPC.createRequest(protoMessage, pubSubTopic);
const query = PushRpc.createRequest(protoMessage, pubSubTopic);
const res = await pipe(
[query.encode()],
lp.encode(),
Expand All @@ -68,7 +68,7 @@ class LightPush extends BaseProtocol implements ILightPush {
bytes.append(chunk);
});

const response = PushRPC.decode(bytes).response;
const response = PushRpc.decode(bytes).response;

if (!response) {
log("No response in PushRPC");
Expand Down
18 changes: 9 additions & 9 deletions packages/core/src/lib/light_push/push_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@ import { proto_lightpush as proto } from "@waku/proto";
import type { Uint8ArrayList } from "uint8arraylist";
import { v4 as uuid } from "uuid";

export class PushRPC {
public constructor(public proto: proto.PushRPC) {}
export class PushRpc {
public constructor(public proto: proto.PushRpc) {}

static createRequest(
message: proto.WakuMessage,
pubSubTopic: string
): PushRPC {
return new PushRPC({
): PushRpc {
return new PushRpc({
requestId: uuid(),
request: {
message: message,
pubSubTopic: pubSubTopic,
pubsubTopic: pubSubTopic,
},
response: undefined,
});
}

static decode(bytes: Uint8ArrayList): PushRPC {
const res = proto.PushRPC.decode(bytes);
return new PushRPC(res);
static decode(bytes: Uint8ArrayList): PushRpc {
const res = proto.PushRpc.decode(bytes);
return new PushRpc(res);
}

encode(): Uint8Array {
return proto.PushRPC.encode(this.proto);
return proto.PushRpc.encode(this.proto);
}

get query(): proto.PushRequest | undefined {
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/message/topic_only_message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import debug from "debug";
const log = debug("waku:message:topic-only");

export class TopicOnlyMessage implements IDecodedMessage {
public payload: undefined;
public payload: Uint8Array = new Uint8Array();
public rateLimitProof: undefined;
public timestamp: undefined;
public ephemeral: undefined;

constructor(private proto: ProtoTopicOnlyMessage) {}

get contentTopic(): string {
return this.proto.contentTopic ?? "";
return this.proto.contentTopic;
}
}

Expand All @@ -29,7 +29,7 @@ export class TopicOnlyDecoder implements IDecoder<TopicOnlyMessage> {
log("Message decoded", protoMessage);
return Promise.resolve({
contentTopic: protoMessage.contentTopic,
payload: undefined,
payload: new Uint8Array(),
rateLimitProof: undefined,
timestamp: undefined,
version: undefined,
Expand Down
35 changes: 11 additions & 24 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,15 @@ export { proto };
export class DecodedMessage implements IDecodedMessage {
constructor(protected proto: proto.WakuMessage) {}

get _rawPayload(): Uint8Array | undefined {
if (this.proto.payload) {
return new Uint8Array(this.proto.payload);
}
return;
}

get ephemeral(): boolean {
return Boolean(this.proto.ephemeral);
}

get payload(): Uint8Array | undefined {
return this._rawPayload;
get payload(): Uint8Array {
return this.proto.payload;
}

get contentTopic(): string | undefined {
get contentTopic(): string {
return this.proto.contentTopic;
}

Expand All @@ -51,18 +44,15 @@ export class DecodedMessage implements IDecodedMessage {
const timestamp = this.proto.timestamp / OneMillion;
return new Date(Number(timestamp));
}

if (this.proto.timestampDeprecated) {
return new Date(this.proto.timestampDeprecated * 1000);
}
return;
} catch (e) {
return;
}
return;
}

get version(): number {
// https://github.com/status-im/js-waku/issues/921
// https://rfc.vac.dev/spec/14/
// > If omitted, the value SHOULD be interpreted as version 0.
return this.proto.version ?? 0;
}

Expand Down Expand Up @@ -115,8 +105,8 @@ export class Decoder implements IDecoder<DecodedMessage> {
const protoMessage = proto.WakuMessage.decode(bytes);
log("Message decoded", protoMessage);
return Promise.resolve({
payload: protoMessage.payload ?? undefined,
contentTopic: protoMessage.contentTopic ?? undefined,
payload: protoMessage.payload,
contentTopic: protoMessage.contentTopic,
version: protoMessage.version ?? undefined,
timestamp: protoMessage.timestamp ?? undefined,
rateLimitProof: protoMessage.rateLimitProof ?? undefined,
Expand All @@ -127,12 +117,9 @@ export class Decoder implements IDecoder<DecodedMessage> {
async fromProtoObj(
proto: IProtoMessage
): Promise<DecodedMessage | undefined> {
// https://github.com/status-im/js-waku/issues/921
if (proto.version === undefined) {
proto.version = 0;
}

if (proto.version !== Version) {
// https://rfc.vac.dev/spec/14/
// > If omitted, the value SHOULD be interpreted as version 0.
if (proto.version ?? 0 !== Version) {
log(
"Failed to decode due to incorrect version, expected:",
Version,
Expand Down
24 changes: 12 additions & 12 deletions packages/core/src/lib/store/history_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ export interface Params {
cursor?: proto.Index;
}

export class HistoryRPC {
private constructor(public readonly proto: proto.HistoryRPC) {}
export class HistoryRpc {
private constructor(public readonly proto: proto.HistoryRpc) {}

get query(): proto.HistoryQuery | undefined {
return this.proto.query;
Expand All @@ -33,7 +33,7 @@ export class HistoryRPC {
/**
* Create History Query.
*/
static createQuery(params: Params): HistoryRPC {
static createQuery(params: Params): HistoryRpc {
const contentFilters = params.contentTopics.map((contentTopic) => {
return { contentTopic };
});
Expand All @@ -56,10 +56,10 @@ export class HistoryRPC {
// milliseconds 10^-3 to nanoseconds 10^-9
endTime = BigInt(params.endTime.valueOf()) * OneMillion;
}
return new HistoryRPC({
return new HistoryRpc({
requestId: uuid(),
query: {
pubSubTopic: params.pubSubTopic,
pubsubTopic: params.pubSubTopic,
contentFilters,
pagingInfo,
startTime,
Expand All @@ -69,13 +69,13 @@ export class HistoryRPC {
});
}

decode(bytes: Uint8ArrayList): HistoryRPC {
const res = proto.HistoryRPC.decode(bytes);
return new HistoryRPC(res);
decode(bytes: Uint8ArrayList): HistoryRpc {
const res = proto.HistoryRpc.decode(bytes);
return new HistoryRpc(res);
}

encode(): Uint8Array {
return proto.HistoryRPC.encode(this.proto);
return proto.HistoryRpc.encode(this.proto);
}
}

Expand All @@ -84,10 +84,10 @@ function directionToProto(
): proto.PagingInfo.Direction {
switch (pageDirection) {
case PageDirection.BACKWARD:
return proto.PagingInfo.Direction.DIRECTION_BACKWARD_UNSPECIFIED;
return proto.PagingInfo.Direction.BACKWARD;
case PageDirection.FORWARD:
return proto.PagingInfo.Direction.DIRECTION_FORWARD;
return proto.PagingInfo.Direction.FORWARD;
default:
return proto.PagingInfo.Direction.DIRECTION_BACKWARD_UNSPECIFIED;
return proto.PagingInfo.Direction.BACKWARD;
}
}
14 changes: 5 additions & 9 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
Cursor,
IDecodedMessage,
IDecoder,
Index,
IStore,
ProtocolCreateOptions,
} from "@waku/interfaces";
Expand All @@ -22,7 +21,7 @@ import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";
import { toProtoMessage } from "../to_proto_message.js";

import { HistoryRPC, PageDirection, Params } from "./history_rpc.js";
import { HistoryRpc, PageDirection, Params } from "./history_rpc.js";

import HistoryError = proto.HistoryResponse.HistoryError;

Expand Down Expand Up @@ -262,7 +261,7 @@ async function* paginate<T extends IDecodedMessage>(
while (true) {
queryOpts.cursor = currentCursor;

const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);

log(
"Querying store peer",
Expand Down Expand Up @@ -294,10 +293,7 @@ async function* paginate<T extends IDecodedMessage>(

const response = reply.response as proto.HistoryResponse;

if (
response.error &&
response.error !== HistoryError.ERROR_NONE_UNSPECIFIED
) {
if (response.error && response.error !== HistoryError.NONE) {
throw "History response contains an Error: " + response.error;
}

Expand Down Expand Up @@ -353,7 +349,7 @@ export function isDefined<T>(msg: T | undefined): msg is T {
export async function createCursor(
message: IDecodedMessage,
pubsubTopic: string = DefaultPubSubTopic
): Promise<Index> {
): Promise<Cursor> {
if (
!message ||
!message.timestamp ||
Expand All @@ -373,7 +369,7 @@ export async function createCursor(
digest,
pubsubTopic,
senderTime: messageTime,
receivedTime: messageTime,
receiverTime: messageTime,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised that using the waku message timestamp here works. Cc @LNSD @danisharora099

};
}

Expand Down
1 change: 1 addition & 0 deletions packages/core/src/lib/to_proto_message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { toProtoMessage } from "./to_proto_message.js";
describe("to proto message", () => {
it("Fields are not dropped", () => {
const wire: WakuMessageProto = {
payload: new Uint8Array(),
contentTopic: "foo",
};

Expand Down
Loading