Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket eth filter system not reporting events correctly #1850

Open
CRossel87a opened this issue Sep 9, 2024 · 6 comments · Fixed by #1853
Open

Websocket eth filter system not reporting events correctly #1850

CRossel87a opened this issue Sep 9, 2024 · 6 comments · Fixed by #1853
Labels
bug Something isn't working linear Created by Linear-GitHub Sync

Comments

@CRossel87a
Copy link

Seid version
name: sei
server_name:
version: v5.7.5
commit: 7e7a9ce
build_tags: netgo ledger,
go: go version go1.21.6 linux/amd64
build_deps:

Chain ID
Mainnet, pacific-1, 1329

Describe the bug
Almost no eth events are emitted through the websocket filter subscription

To Reproduce
Steps to reproduce the behavior:

  1. wscat -c ws://127.0.0.1:8546 # Connect to the websocket server
  2. Subscribe to Uniswap3 swaps
    {"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params": ["logs", {"topics": ["0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"]}]}
  3. Wait & see or swap a small amount at Oku trade and observe

OR

  1. Subscribe to Uniswap2 sync events:
    {"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params": ["logs", {"topics": ["0x1c411e9a96e071241c2f21f7726b17ae89e3cab4c78be50e062b03a9fffbbad1"]}]}
  2. Wait & see or swap a small amount at Dragonswap and observe

Expected behavior
All events emitted

@CRossel87a CRossel87a added bug Something isn't working linear Created by Linear-GitHub Sync labels Sep 9, 2024
@lordshisho
Copy link

lordshisho commented Sep 9, 2024

For anyone looking to confirm:

const ethers = require('ethers');

const pollingProvider = new ethers.providers.JsonRpcProvider("https://evm-rpc.sei-apis.com");
const websocketProvider = new ethers.providers.WebSocketProvider("wss://evm-ws.sei-apis.com");

const eventTracker = new Map();
const CONFIRMATION_TIMEOUT = 10000; // 10 seconds

async function listenForSwapEvents() {
    console.log("Listening for Swap events...");
    const swapTopic = ethers.utils.id("Swap(address,uint256,uint256,uint256,uint256,address)");

    const filter = {
        topics: [swapTopic],
        fromBlock: 'latest'
    };

    function handleEvent(providerType, log) {
        const eventKey = `${log.transactionHash}-${log.logIndex}`;
        
        if (!eventTracker.has(eventKey)) {
            eventTracker.set(eventKey, { providers: new Set(), timeout: null });
        }
        
        const eventInfo = eventTracker.get(eventKey);
        eventInfo.providers.add(providerType);

        if (eventInfo.providers.size === 1) {
            // First provider to detect this event, start the confirmation timeout
            eventInfo.timeout = setTimeout(() => {
                if (eventInfo.providers.size < 2) {
                    console.error(`ERROR: Event not confirmed by both providers within ${CONFIRMATION_TIMEOUT}ms`);
                    console.error(`Transaction Hash: ${log.transactionHash}`);
                    console.error(`Contract Address: ${log.address}`);
                    console.error(`Detected by: ${Array.from(eventInfo.providers).join(', ')}`);
                }
                eventTracker.delete(eventKey);
            }, CONFIRMATION_TIMEOUT);
        } else if (eventInfo.providers.size === 2) {
            // Both providers have detected the event
            clearTimeout(eventInfo.timeout);
            console.log(`\nSwap event confirmed by both providers:`);
            console.log(`Transaction Hash: ${log.transactionHash}`);
            console.log(`Contract Address: ${log.address}`);
            eventTracker.delete(eventKey);
        }
    }

    pollingProvider.on(filter, (log) => handleEvent('Polling', log));
    websocketProvider.on(filter, (log) => handleEvent('WebSocket', log));

    // Handle WebSocket disconnections
    websocketProvider._websocket.on("close", (code) => {
        console.log(`WebSocket disconnected with code ${code}. Attempting to reconnect...`);
        websocketProvider._websocket.terminate();
        listenForSwapEvents();
    });
}

listenForSwapEvents().catch(console.error);

// Keep the script running
process.stdin.resume();

It does in fact miss the majority of events on the Websocket Provider for some reason.

Swap event confirmed by both providers:
Transaction Hash: 0xd111657bb13d4c9fb595d6176bfdbf7fdc1c65b968ebd61685f690d8a72a7fd7
Contract Address: 0x70BCA5F4D188D93eD202dFdA5Db9eEdA0760d2b0
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x1938d34356afa36070e9a4da3a0643be326b118b8fdbafc918f72b1c0411fa01
Contract Address: 0x8D5261cFF8d63E71C772574EbA63E64E6726EE06
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x56235c22f15e522c4c63dc6fa5ed5e519afdbec39e7c201ccfc8eb460d43ae8d
Contract Address: 0xC75C669a62A7eCe0C8d37904b747970467432ad3
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x7b42c52c5b11248794eb39496000eabdf9a25f6e5d8ddc74f2d9951e5d6763ff
Contract Address: 0xDfe611c5a718a0525c6261Cf1f100e8db776b4b4
Detected by: Polling

@CRossel87a
Copy link
Author

CRossel87a commented Sep 9, 2024

For anyone looking to confirm:

const ethers = require('ethers');

const pollingProvider = new ethers.providers.JsonRpcProvider("https://evm-rpc.sei-apis.com");
const websocketProvider = new ethers.providers.WebSocketProvider("wss://evm-ws.sei-apis.com");

const eventTracker = new Map();
const CONFIRMATION_TIMEOUT = 10000; // 10 seconds

async function listenForSwapEvents() {
    console.log("Listening for Swap events...");
    const swapTopic = ethers.utils.id("Swap(address,uint256,uint256,uint256,uint256,address)");

    const filter = {
        topics: [swapTopic],
        fromBlock: 'latest'
    };

    function handleEvent(providerType, log) {
        const eventKey = `${log.transactionHash}-${log.logIndex}`;
        
        if (!eventTracker.has(eventKey)) {
            eventTracker.set(eventKey, { providers: new Set(), timeout: null });
        }
        
        const eventInfo = eventTracker.get(eventKey);
        eventInfo.providers.add(providerType);

        if (eventInfo.providers.size === 1) {
            // First provider to detect this event, start the confirmation timeout
            eventInfo.timeout = setTimeout(() => {
                if (eventInfo.providers.size < 2) {
                    console.error(`ERROR: Event not confirmed by both providers within ${CONFIRMATION_TIMEOUT}ms`);
                    console.error(`Transaction Hash: ${log.transactionHash}`);
                    console.error(`Contract Address: ${log.address}`);
                    console.error(`Detected by: ${Array.from(eventInfo.providers).join(', ')}`);
                }
                eventTracker.delete(eventKey);
            }, CONFIRMATION_TIMEOUT);
        } else if (eventInfo.providers.size === 2) {
            // Both providers have detected the event
            clearTimeout(eventInfo.timeout);
            console.log(`\nSwap event confirmed by both providers:`);
            console.log(`Transaction Hash: ${log.transactionHash}`);
            console.log(`Contract Address: ${log.address}`);
            eventTracker.delete(eventKey);
        }
    }

    pollingProvider.on(filter, (log) => handleEvent('Polling', log));
    websocketProvider.on(filter, (log) => handleEvent('WebSocket', log));

    // Handle WebSocket disconnections
    websocketProvider._websocket.on("close", (code) => {
        console.log(`WebSocket disconnected with code ${code}. Attempting to reconnect...`);
        websocketProvider._websocket.terminate();
        listenForSwapEvents();
    });
}

listenForSwapEvents().catch(console.error);

// Keep the script running
process.stdin.resume();

It does in fact miss the majority of events on the Websocket Provider for some reason.

Swap event confirmed by both providers:
Transaction Hash: 0xd111657bb13d4c9fb595d6176bfdbf7fdc1c65b968ebd61685f690d8a72a7fd7
Contract Address: 0x70BCA5F4D188D93eD202dFdA5Db9eEdA0760d2b0
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x1938d34356afa36070e9a4da3a0643be326b118b8fdbafc918f72b1c0411fa01
Contract Address: 0x8D5261cFF8d63E71C772574EbA63E64E6726EE06
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x56235c22f15e522c4c63dc6fa5ed5e519afdbec39e7c201ccfc8eb460d43ae8d
Contract Address: 0xC75C669a62A7eCe0C8d37904b747970467432ad3
Detected by: Polling
ERROR: Event not confirmed by both providers within 10000ms
Transaction Hash: 0x7b42c52c5b11248794eb39496000eabdf9a25f6e5d8ddc74f2d9951e5d6763ff
Contract Address: 0xDfe611c5a718a0525c6261Cf1f100e8db776b4b4
Detected by: Polling

Looking at the code in evmrpc\subscribe.go, we can see the logs functionality has not been properly implemented. My good friend Claude the AI made this, and it seems to work - but not vetted:

`

func (a *SubscriptionAPI) Logs(ctx context.Context, filter *filters.FilterCriteria) (s *rpc.Subscription, err error) {
defer recordMetrics("eth_logs", a.connectionType, time.Now(), err == nil)

notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
	return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}

if filter == nil {
	filter = &filters.FilterCriteria{}
}

rpcSub := notifier.CreateSubscription()

// Create a channel for new head notifications
newHeadCh := make(chan map[string]interface{}, NewHeadsListenerBuffer)

// Subscribe to new heads
a.newHeadListenersMtx.Lock()
if uint64(len(a.newHeadListeners)) >= a.subscriptonConfig.newHeadLimit {
	a.newHeadListenersMtx.Unlock()
	return nil, errors.New("no new subscription can be created")
}
a.newHeadListeners[rpcSub.ID] = newHeadCh
a.newHeadListenersMtx.Unlock()

go func() {
	defer func() {
		a.newHeadListenersMtx.Lock()
		delete(a.newHeadListeners, rpcSub.ID)
		close(newHeadCh)
		a.newHeadListenersMtx.Unlock()
	}()

	for {
		select {
		case newHead, ok := <-newHeadCh:
			if !ok {
				return
			}

			blockNumber, ok := newHead["number"].(*hexutil.Big)
			if !ok {
				continue
			}

			currentBlock := blockNumber.ToInt().Int64()

			// Check if we should process this block based on FromBlock filter
			if filter.FromBlock != nil && currentBlock < filter.FromBlock.Int64() {
				continue
			}

			// Check if we've exceeded ToBlock filter
			if filter.ToBlock != nil && currentBlock > filter.ToBlock.Int64() {
				return
			}

			// Fetch logs for the current block
			logs, _, err := a.logFetcher.GetLogsByFilters(ctx, *filter, currentBlock)
			if err != nil {
				_ = notifier.Notify(rpcSub.ID, err)
				return
			}

			for _, log := range logs {
				if err := notifier.Notify(rpcSub.ID, log); err != nil {
					return
				}
			}

		case <-rpcSub.Err():
			return
		case <-notifier.Closed():
			return
		}
	}
}()

return rpcSub, nil

}`

@CRossel87a
Copy link
Author

#1853 does not complete this issue, not even close

@cordt-sei
Copy link
Contributor

Hey @CRossel87a we tried using your fix there and it seemed to partially work, it would start to show the 'synthetic' events for any cross-environment tx, but not all of them. Did it work fully for you?

@cordt-sei cordt-sei reopened this Sep 17, 2024
@CRossel87a
Copy link
Author

Hey @CRossel87a we tried using your fix there and it seemed to partially work, it would start to show the 'synthetic' events for any cross-environment tx, but not all of them. Did it work fully for you?

Hi,

I have not done deep testing on this

@cordt-sei
Copy link
Contributor

cordt-sei commented Oct 4, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working linear Created by Linear-GitHub Sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants