Skip to content

Commit

Permalink
Merge branch 'moreMetrics'
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanio committed Jun 11, 2023
2 parents 8d041df + c0d63b7 commit 6050d71
Show file tree
Hide file tree
Showing 11 changed files with 4,026 additions and 3,198 deletions.
3,896 changes: 3,896 additions & 0 deletions dashboards/gossip.json

Large diffs are not rendered by default.

3,019 changes: 0 additions & 3,019 deletions dashboards/libp2p.json

This file was deleted.

Binary file modified datadirs/datadir/dev.db
Binary file not shown.
1 change: 1 addition & 0 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export OPENSEA_API_KEY=""

export SEAPORT_GOSSIP_COLLECTION_ADDRESSES="0x942bc2d3e7a589fe5bd4a5c6ef9727dfd82f5c8a,0xe4d20bc4a845aa35b008f5f9f85e50b581df7263"
export SEAPORT_GOSSIP_LOG_LEVEL="debug"
export SEAPORT_GOSSIP_METRICS="false"

export SEAPORT_GOSSIP_INGEST_OPENSEA_ORDERS="true"
export SEAPORT_GOSSIP_GET_ALL_ORDERS_FROM_PEERS="true"
Expand Down
125 changes: 19 additions & 106 deletions src/ingestors/opensea.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import { WebSocket } from 'ws'
import { addOrder, exceedsMaxOrderLimits } from '../query/order.js'
import { RateLimit, short } from '../util/helpers.js'
import { deriveOrderHash } from '../util/order.js'
import { AuctionType, OrderEvent } from '../util/types.js'
import { OrderEvent } from '../util/types.js'

import type { SeaportGossipNode } from '../node.js'
import type { Address, ItemType, OrderJSON, OrderType } from '../util/types.js'
import type { Address, AuctionType, OrderJSON } from '../util/types.js'
import type {
BaseStreamMessage,
ItemListedEventPayload,
Expand All @@ -18,54 +18,10 @@ import type {
} from '@opensea/stream-js'
import type { RequestInit } from 'node-fetch'

enum OpenSeaOrderType {
LISTINGS,
OFFERS,
}

interface IngestorOpts {
node: SeaportGossipNode
}

interface OpenSeaOfferItem {
itemType: ItemType
token: string
identifierOrCriteria: string
startAmount: string
endAmount: string
}

interface OpenSeaConsiderationItem extends OpenSeaOfferItem {
recipient: string
}

/* eslint-disable @typescript-eslint/naming-convention */
interface OpenSeaOrder {
created_date: string
closing_date: string
listing_time: number
expiration_time: number
order_hash: string
order_type: string
protocol_data: {
parameters: {
offerer: string
offer: OpenSeaOfferItem[]
consideration: OpenSeaConsiderationItem[]
startTime: string
endTime: string
orderType: OrderType
zone: string
zoneHash: string
salt: string
conduitKey: string
totalOriginalConsiderationItems: number
counter: number
}
signature: string
}
}

export class OpenSeaOrderIngestor {
private node: SeaportGossipNode
private client: OpenSeaStreamClient
Expand Down Expand Up @@ -120,20 +76,11 @@ export class OpenSeaOrderIngestor {
| BaseStreamMessage<ItemReceivedOfferEventPayload>
| BaseStreamMessage<ItemReceivedBidEventPayload>
) {
const [chain, collectionAddress, tokenId] =
event.payload.item.nft_id.split('/')
const { payload, event_type: eventType } = event
const [chain, collectionAddress] = payload.item.nft_id.split('/')
if (chain !== 'ethereum') return
const orderHash = event.payload.order_hash
const orderType =
event.event_type === EventType.ITEM_LISTED
? OpenSeaOrderType.LISTINGS
: OpenSeaOrderType.OFFERS
const order = await this._getOrder(
collectionAddress,
tokenId,
orderHash,
orderType
)
const orderHash = payload.order_hash
const order = await this._orderToOrderJSON((payload as any).protocol_data) // protocol_data is not yet typed in stream-js
if (order === undefined) return
if (await exceedsMaxOrderLimits(order, this.node)) return
const [isAdded, metadata] = await addOrder(
Expand All @@ -143,11 +90,9 @@ export class OpenSeaOrderIngestor {
false,
order.auctionType
)
let log = `OpenSea Ingestor: New event ${
event.event_type
} for collection ${short(collectionAddress)}, order hash: ${short(
orderHash
)}`
let log = `OpenSea Ingestor: New event ${eventType} for collection ${short(
collectionAddress
)}, order hash: ${short(orderHash)}`
if (isAdded) {
log += '. Added order to db.'
} else {
Expand All @@ -164,59 +109,26 @@ export class OpenSeaOrderIngestor {
blockHash: metadata.lastValidatedBlockHash ?? ethers.constants.HashZero,
}
await this.node.publishEvent(gossipsubEvent)
this.node.metrics?.ordersIngestedOpenSea.inc()
}

private async _getOrder(
address: Address,
tokenId: string,
orderHash: string,
type: OpenSeaOrderType
) {
const params = new URLSearchParams({
asset_contract_address: address,
token_ids: tokenId,
order_by: 'created_date',
order_direction: 'desc',
this.node.metrics?.ordersIngestedOpenSea.inc({
addedToDB: isAdded ? 'yes' : 'no',
})
const base =
type === OpenSeaOrderType.LISTINGS
? this.LISTINGS_ENDPOINT
: this.OFFERS_ENDPOINT
await this.limit()
try {
const response = await this._fetch(`${base}?${params.toString()}`)
const data: any = await response.json()
if (data.orders === undefined || data.orders.length === 0)
return undefined
const orders = data.orders as OpenSeaOrder[]
const order = orders.find((o) => o.order_hash === orderHash)
if (order === undefined) return undefined
return this._orderToOrderJSON(order)
} catch (error: any) {
this.node.logger.error(
`Error fetching order from OpenSea: ${error.message ?? error}`
)
}
}

private _orderToOrderJSON(
order: OpenSeaOrder
): OrderJSON & { auctionType: AuctionType } {
const { parameters, signature } = order.protocol_data
private async _orderToOrderJSON(
protocolData: any
): Promise<OrderJSON & { auctionType: AuctionType }> {
const { parameters, signature } = protocolData
delete (parameters as any).totalOriginalConsiderationItems
let auctionType = AuctionType.BASIC
if (order.order_type === 'english') auctionType = AuctionType.ENGLISH
if (order.order_type === 'dutch') auctionType = AuctionType.DUTCH
return {
const order = {
...parameters,
startTime: Number(parameters.startTime),
endTime: Number(parameters.endTime),
salt: BigNumber.from(parameters.salt).toString(),
signature,
chainId: '1',
auctionType,
}
order.auctionType = await this.node.validator.auctionType(order)
return order
}

private async _getCollectionSlug(
Expand Down Expand Up @@ -245,6 +157,7 @@ export class OpenSeaOrderIngestor {
...opts,
headers: {
accept: 'application/json',
// eslint-disable-next-line @typescript-eslint/naming-convention
'X-API-KEY': this.node.opts.openSeaAPIKey,
...opts.headers,
},
Expand Down
21 changes: 18 additions & 3 deletions src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import { isValidAddress, short } from './util/helpers.js'
import { createWinstonLogger } from './util/log.js'
import { setupMetrics } from './util/metrics.js'
import { deriveOrderHash } from './util/order.js'
import { ProviderWithMetrics } from './util/provider.js'
import {
decodeGossipsubEvent,
emptyOrderJSON,
Expand Down Expand Up @@ -124,8 +125,16 @@ export class SeaportGossipNode {

this.provider =
typeof this.opts.web3Provider === 'string'
? new ethers.providers.JsonRpcProvider(this.opts.web3Provider)
: this.opts.web3Provider
? new ProviderWithMetrics(
this.opts.web3Provider,
undefined,
this.metrics
)
: new ProviderWithMetrics(
this.opts.web3Provider.connection.url,
this.opts.web3Provider._network,
this.metrics
)
this.seaport = new ethers.Contract(
this.opts.seaportAddress,
ISeaport,
Expand All @@ -140,6 +149,10 @@ export class SeaportGossipNode {
) {
this.ingestor = new OpenSeaOrderIngestor({ node: this })
}

process.on('uncaughtException', (error) => {
this.logger.error(`uncaughtException: ${JSON.stringify(error)}`)
})
}

/**
Expand Down Expand Up @@ -169,7 +182,9 @@ export class SeaportGossipNode {
msgIdFn: gossipsubMsgIdFn,
})

const metrics = this.opts.metrics ? prometheusMetrics() : undefined
const metrics = this.opts.metrics
? prometheusMetrics({ preserveExistingMetrics: true })
: undefined

const libp2pOpts = {
peerId: this.opts.peerId ?? undefined,
Expand Down
9 changes: 9 additions & 0 deletions src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ export const handleProtocol = async (

logger.debug(`Received protocol message ${protocol.name} from ${shortPeerId}`)

node.metrics?.wireMessagesTotal.inc({
name: protocol.name,
peerId: peer.toString(),
})

switch (protocol.name) {
case 'GetOrders': {
const { reqId, hashes } = orderHashesDecode(message)
Expand Down Expand Up @@ -292,6 +297,10 @@ export const handleProtocol = async (
logger.debug(
`Received ${hashes.length} order hashes from ${shortPeerId} (reqId: ${reqId})`
)
node.metrics?.orderHashesReceived.inc(
{ peerId: peer.toString() },
hashes.length
)
return
}
case 'GetOrderCount': {
Expand Down
Loading

0 comments on commit 6050d71

Please sign in to comment.