Skip to content

Commit

Permalink
Merge remote-tracking branch 'bebop/bebop-ws-fixes' into feat/BACK-1748
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieJoo committed Nov 5, 2024
2 parents beb7249 + ccfb89e commit b39ee21
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 116 deletions.
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"@hashflow/sdk": "^2.2.7",
"@hashflow/taker-js": "^0.3.7",
"@paraswap/core": "2.4.0",
"@types/ws": "^8.5.12",
"async": "^3.2.4",
"axios": "0.27.2",
"bignumber.js": "9.1.0",
Expand All @@ -69,6 +70,7 @@
"lodash": "4.17.21",
"log4js": "6.6.1",
"node-cache": "^5.1.2",
"protobufjs": "^7.4.0",
"ts-essentials": "9.1.2",
"uuid": "^9.0.0",
"web3": "1.6.0",
Expand All @@ -77,7 +79,7 @@
"web3-eth-abi": "1.4.0",
"web3-eth-contract": "1.6.0",
"web3-utils": "1.6.0",
"websocket": "1.0.35"
"ws": "^8.18.0"
},
"sideEffects": false
}
1 change: 1 addition & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ const baseConfigs: { [network: number]: BaseConfig } = {
multicallV2Address: '0xC50F4c1E81c873B2204D7eFf7069Ffec6Fbe136D',
privateHttpProvider: process.env.HTTP_PROVIDER_56,
augustusV6Address: '0x6a000f20005980200259b80c5102003040001068',
bebopAuthToken: process.env.API_KEY_BEBOP_AUTH_TOKEN || '',
executorsAddresses: {
Executor01: '0x000010036C0190E009a000d0fc3541100A07380A',
Executor02: '0x00C600b30fb0400701010F4b080409018B9006E0',
Expand Down
21 changes: 21 additions & 0 deletions src/dex/bebop/bebop-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,25 @@ describe('Bebop E2E', () => {
nativeTokenAmount,
);
});

describe('BSC', () => {
const network = Network.BSC;

const tokenASymbol: string = 'USDT';
const tokenBSymbol: string = 'ETH';

const tokenAAmount: string = '100000000000000000000';
const tokenBAmount: string = '100000000000000000';
const nativeTokenAmount = '1000000000000000000';

testForNetwork(
network,
dexKey,
tokenASymbol,
tokenBSymbol,
tokenAAmount,
tokenBAmount,
nativeTokenAmount,
);
});
});
15 changes: 15 additions & 0 deletions src/dex/bebop/bebop.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package bebop;

message PriceUpdate {
optional bytes base = 1;
optional bytes quote = 2;
optional uint64 last_update_ts = 3;
repeated float bids = 4 [packed=true];
repeated float asks = 5 [packed=true];
}

message BebopPricingUpdate {
repeated PriceUpdate pairs = 1;
}
2 changes: 1 addition & 1 deletion src/dex/bebop/bebop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
pricesReqParams: {
url:
BEBOP_WS_API_URL +
`/pmm/${BebopConfig['Bebop'][network].chainName}/v3/pricing`,
`/pmm/${BebopConfig['Bebop'][network].chainName}/v3/pricing?format=protobuf`,
headers: {
name: BEBOP_AUTH_NAME,
authorization: this.bebopAuthToken,
Expand Down
5 changes: 5 additions & 0 deletions src/dex/bebop/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,10 @@ export const BebopConfig: DexConfigMap<DexParams> = {
chainName: 'optimism',
middleTokens: ['0x7F5c764cBc14f9669B88837ca1490cCa17c31607'],
},
[Network.BSC]: {
settlementAddress: '0xbbbbbBB520d69a9775E85b458C58c648259FAD5F',
chainName: 'bsc',
middleTokens: ['0x55d398326f99059fF775485246999027B3197955'],
},
},
};
57 changes: 51 additions & 6 deletions src/dex/bebop/rate-fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,30 @@
import { ETHER_ADDRESS, Network } from '../../constants';
import { IDexHelper } from '../../dex-helper';
import { Fetcher } from '../../lib/fetcher/fetcher';
import { validateAndCast } from '../../lib/validators';
import { validateAndCast, ValidationError } from '../../lib/validators';
import { Logger, Token } from '../../types';
import {
BebopLevel,
BebopPair,
BebopPricingResponse,
BebopRateFetcherConfig,
BebopTokensResponse,
} from './types';
import { pricesResponseValidator, tokensResponseValidator } from './validators';
import {
BebopPricingUpdate,
pricesResponseValidator,
tokensResponseValidator,
} from './validators';
import { WebSocketFetcher } from '../../lib/fetcher/wsFetcher';
import { utils } from 'ethers';

export function levels_from_flat_array(values: number[]): BebopLevel[] {
const levels: BebopLevel[] = [];
for (let i = 0; i < values.length; i += 2) {
levels.push([values[i], values[i + 1]]);
}
return levels;
}

export class RateFetcher {
private pricesFetcher: WebSocketFetcher<BebopPricingResponse>;
Expand All @@ -35,10 +50,16 @@ export class RateFetcher {
info: {
requestOptions: config.rateConfig.pricesReqParams,
caster: (data: unknown) => {
return validateAndCast<BebopPricingResponse>(
data,
pricesResponseValidator,
);
const dataBuffer = data as any;
const invalid = BebopPricingUpdate.verify(dataBuffer);
if (invalid) {
throw new ValidationError(invalid);
}
const update = BebopPricingUpdate.decode(dataBuffer);
const updateObject = BebopPricingUpdate.toObject(update, {
longs: Number,
});
return this.parsePricingUpdate(updateObject);
},
},
handler: this.handlePricesResponse.bind(this),
Expand Down Expand Up @@ -68,6 +89,30 @@ export class RateFetcher {
);
}

parsePricingUpdate(updateObject: any): BebopPricingResponse {
const pricingResponse: BebopPricingResponse = {};
if (!updateObject.pairs || !updateObject.pairs.length) {
this.logger.warn('Update message did not include pairs', updateObject);
return pricingResponse;
}
for (const pairBook of updateObject.pairs) {
const pair =
utils.getAddress('0x' + pairBook.base.toString('hex')) +
'/' +
utils.getAddress('0x' + pairBook.quote.toString('hex'));
const lastUpdateTs = pairBook.lastUpdateTs;
const bids = pairBook.bids ? levels_from_flat_array(pairBook.bids) : [];
const asks = pairBook.asks ? levels_from_flat_array(pairBook.asks) : [];
const bebopPair: BebopPair = {
bids,
asks,
last_update_ts: lastUpdateTs,
};
pricingResponse[pair] = bebopPair;
}
return pricingResponse;
}

start() {
this.pricesFetcher.startPolling();
this.tokensFetcher.startPolling();
Expand Down
4 changes: 4 additions & 0 deletions src/dex/bebop/validators.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import joi from 'joi';
import protobuf from 'protobufjs';

const levelValidator = joi.array().items(joi.number()).length(2);

Expand Down Expand Up @@ -27,3 +28,6 @@ export const tokensResponseValidator = joi.object({
export const blacklistResponseValidator = joi.object({
blacklist: joi.array().items(joi.string().min(1)).required(),
});

const root = protobuf.loadSync('src/dex/bebop/bebop.proto');
export const BebopPricingUpdate = root.lookupType('bebop.BebopPricingUpdate');
145 changes: 79 additions & 66 deletions src/lib/fetcher/wsFetcher.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import { Logger } from 'log4js';
import { RequestConfig, Response } from '../../dex-helper/irequest-wrapper';
import {
connection as WebSocketConnection,
client as WebSocketClient,
} from 'websocket';

import WebSocket from 'ws';
export class SkippingRequest {
constructor(public message = '') {}
}
Expand All @@ -26,74 +22,77 @@ export type RequestInfoWithHandler<T> = {

export class WebSocketFetcher<T> {
private requests: RequestInfoWithHandler<T>;
// Time to wait before declaring connection as broken and restarting it
private timeoutInterval: number;
// Time to wait after disconnection before reconnecting
private reconnectDelay: number;
private pingTimeout: NodeJS.Timeout | undefined = undefined;
public lastFetchSucceeded: boolean = false;
private stop: boolean = true;
private ws: WebSocketClient = new WebSocketClient();
private connection: WebSocketConnection | null = null;

constructor(requestsInfo: RequestInfoWithHandler<T>, private logger: Logger) {
private connection: WebSocket | null = null;

constructor(
requestsInfo: RequestInfoWithHandler<T>,
private logger: Logger,
timeoutInterval: number = 10000,
reconnectDelay: number = 5000,
) {
this.requests = requestsInfo;
this.ws.on('connect', this.connected.bind(this));
this.ws.on('connectFailed', this.connectFailed.bind(this));
this.timeoutInterval = timeoutInterval;
this.reconnectDelay = reconnectDelay;
}

private connected(connection: WebSocketConnection) {
this.connection = connection;
private connected() {
this.logger.info(`Connected to ${this.requests.info.requestOptions.url}`);
this.connection.on('error', this.onError.bind(this));
this.connection.on('close', this.onClose.bind(this));
this.connection.on('message', this.onMessage.bind(this));
this.heartbeat();
}

private connectFailed(error: any) {
this.logger.error(`Connect Error: ${error.toString()}. Reconnecting...`);
// reconnect on errors / failures
setTimeout(() => {
this.startPolling();
}, 3000);
private heartbeat() {
clearTimeout(this.pingTimeout);
this.pingTimeout = setTimeout(() => {
this.logger.warn('No heartbeat. Terminating Connection...');
this?.connection?.terminate();
}, this.timeoutInterval);
}

private onClose() {
this.logger.info(`Connection closed. Reconnecting...`);
// reconnect on errors / failures
setTimeout(() => {
this.startPolling();
}, 3000);
this.logger.info(`Connection closed.`);
// Do not reconnect if polling is stopped
if (this.stop) {
clearTimeout(this.pingTimeout);
return;
}

this.logger.info(`Unexpected closure, Reconnecting...`);
this.reconnectWithDelay();
}

private onError(error: any) {
this.logger.error(
`Connection Error: ${error.toString()}. Stopping & Reconnecting...`,
`Websocket Error: ${error.toString()}. Stopping & Reconnecting...`,
);
this.stopPolling();

// reconnect on errors / failures
setTimeout(() => {
this.startPolling();
}, 3000);
this?.connection?.terminate();
}

private onMessage(message: any) {
if (message.type === 'utf8') {
const response = JSON.parse(message.utf8Data) as Response<T>;
const reqInfo = this.requests;
const info = reqInfo.info;
const options = reqInfo.info.requestOptions;
this.logger.debug(`(${options.url}) received new data`);

try {
const parsedData = info.caster(response);
reqInfo.handler(parsedData);
} catch (e) {
this.logger.info(e);
this.logger.info(
`(${options.url}) received incorrect data ${JSON.stringify(
response,
).replace(/(?:\r\n|\r|\n)/g, ' ')}`,
e,
);
return;
}
private onMessage(data: WebSocket.RawData) {
this.heartbeat();
const reqInfo = this.requests;
const info = reqInfo.info;
const options = reqInfo.info.requestOptions;
this.logger.debug(`(${options.url}) received new data`);

try {
const parsedData = info.caster(data);
reqInfo.handler(parsedData);
} catch (e) {
this.logger.info(e);
this.logger.info(
`(${options.url}) received incorrect data ${JSON.stringify(
data,
).replace(/(?:\r\n|\r|\n)/g, ' ')}`,
e,
);
return;
}
}

Expand All @@ -110,30 +109,44 @@ export class WebSocketFetcher<T> {
this.logger.info(
`Connecting to ${this.requests.info.requestOptions.url}...`,
);
this.ws.connect(
this.requests.info.requestOptions.url!,
undefined,
undefined,
{
const ws = new WebSocket(this.requests.info.requestOptions.url!, {
headers: {
Authorization: authorization,
name: name,
},
);
});

ws.on('open', this.connected.bind(this));
ws.on('message', this.onMessage.bind(this));
ws.on('error', this.onError.bind(this));
ws.on('close', this.onClose.bind(this));
this.connection = ws;
}

startPolling(): void {
this.stop = false;
reconnectWithDelay() {
this.logger.info(`Waiting ${this.reconnectDelay}ms before reconnecting...`);
clearTimeout(this.pingTimeout);
setTimeout(() => {
this.reconnect();
}, this.reconnectDelay);
}

reconnect() {
clearTimeout(this.pingTimeout);
this.connect();
this.logger.info(
`Connection started for ${this.requests.info.requestOptions.url}`,
);
}

startPolling(): void {
this.stop = false;
this.reconnect();
}

stopPolling() {
if (this.connection) {
this.connection.close();
}
this.stop = true;
this.connection?.terminate();
this.logger.info(
`Connection stopped for ${this.requests.info.requestOptions.url}`,
);
Expand Down
Loading

0 comments on commit b39ee21

Please sign in to comment.