Skip to content

Commit

Permalink
Merge branch 'master' into remove-validator-dep
Browse files Browse the repository at this point in the history
  • Loading branch information
edfelten authored Feb 12, 2020
2 parents fe46892 + 3b20867 commit bbe2366
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 27 deletions.
79 changes: 54 additions & 25 deletions packages/arb-provider-go/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,28 @@ func (conn *ArbConnection) SendTransaction(ctx context.Context, tx *types.Transa
//
// TODO(karalabe): Deprecate when the subscription one can return past data too.
func (conn *ArbConnection) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) {
return nil, _nyiError("FilterLogs")
var ret []types.Log
address, topics := _extractAddrTopics(query)
logInfos, err := conn.proxy.FindLogs(0, math.MaxInt32, address[:], topics)
if err != nil {
return nil, err
}
for _, logInfo := range logInfos {
outs, err := _decodeLogInfo(logInfo)
if err != nil {
return nil, err
}
ok := true
for i, targetTopic := range topics {
if targetTopic != outs.Topics[i] {
ok = false
}
}
if ok {
ret = append(ret, *outs)
}
}
return ret, nil
}

// SubscribeFilterLogs creates a background log filtering operation, returning
Expand All @@ -238,12 +259,13 @@ const subscriptionPollingInterval = 5 * time.Second
type subscription struct {
proxy ValidatorProxy
firstBlockUnseen uint64
active bool
logChan chan<- types.Log
errChan chan error
address common.Address
topics [][32]byte
unsubOnce *sync.Once
closeChan chan interface{}
wg sync.WaitGroup
}

func _extractAddrTopics(query ethereum.FilterQuery) (addr common.Address, topics [][32]byte) {
Expand Down Expand Up @@ -323,44 +345,50 @@ func newSubscription(conn *ArbConnection, query ethereum.FilterQuery, ch chan<-
sub := &subscription{
conn.proxy,
0,
true,
ch,
make(chan error, 1),
address,
topics,
&sync.Once{},
make(chan interface{}),
sync.WaitGroup{},
}
sub.wg.Add(1)
go func() {
defer sub.wg.Done()
defer sub.Unsubscribe()
ticker := time.NewTicker(subscriptionPollingInterval)
defer ticker.Stop()
for {
time.Sleep(subscriptionPollingInterval)
if !sub.active {
select {
case <- sub.closeChan:
return
}
logInfos, err := sub.proxy.FindLogs(int64(sub.firstBlockUnseen), math.MaxInt32, sub.address[:], sub.topics)
if err != nil {
sub.errChan <- err
return
}
for _, logInfo := range logInfos {
outs, err := _decodeLogInfo(logInfo)
case <- ticker.C:
logInfos, err := sub.proxy.FindLogs(int64(sub.firstBlockUnseen), math.MaxInt32, sub.address[:], sub.topics)
if err != nil {
sub.errChan <- err
return
}
ok := true
for i, targetTopic := range topics {
if targetTopic != outs.Topics[i] {
for _, logInfo := range logInfos {
outs, err := _decodeLogInfo(logInfo)
if err != nil {
sub.errChan <- err
return
}
ok := true
for i, targetTopic := range topics {
if targetTopic != outs.Topics[i] {
ok = false
}
}
if outs.BlockNumber < sub.firstBlockUnseen {
ok = false
}
}
if outs.BlockNumber < sub.firstBlockUnseen {
ok = false
}
if ok {
sub.logChan <- *outs
if sub.firstBlockUnseen <= outs.BlockNumber {
sub.firstBlockUnseen = outs.BlockNumber + 1
if ok {
sub.logChan <- *outs
if sub.firstBlockUnseen <= outs.BlockNumber {
sub.firstBlockUnseen = outs.BlockNumber + 1
}
}
}
}
Expand All @@ -373,7 +401,8 @@ func newSubscription(conn *ArbConnection, query ethereum.FilterQuery, ch chan<-
// and closes the error channel.
func (sub *subscription) Unsubscribe() {
sub.unsubOnce.Do(func() {
sub.active = false
close(sub.closeChan)
sub.wg.Wait()
close(sub.errChan)
})
}
Expand Down
4 changes: 2 additions & 2 deletions packages/arb-provider-go/validatorProxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,12 @@ func (vp *ValidatorProxyImpl) CallMessage(contract common.Address, sender common
}
retBuf, err := hexutil.Decode(response.RawVal)
if err != nil {
log.Println("GetMessageResult error:", err)
log.Println("CallMessage error:", err)
return nil, err
}
retVal, err := value.UnmarshalValue(bytes.NewReader(retBuf))
if err != nil {
log.Println("ValProxy.GetMessageResult: UnmarshalValue returned error:", err)
log.Println("ValProxy.CallMessage: UnmarshalValue returned error:", err)
}
return retVal, err
}

0 comments on commit bbe2366

Please sign in to comment.