Skip to content

Commit

Permalink
Merge pull request #5 from supernovahs/supernovahs/examples
Browse files Browse the repository at this point in the history
feat(examples) : get operator info on service
  • Loading branch information
supernovahs authored May 15, 2024
2 parents 97b9f73 + e5c6da7 commit 418ede4
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 98 deletions.
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ members = [ "crates/chainio/clients/avsregistry/",
"crates/services/operatorsinfo/",
"crates/types/",
"crates/metrics/",
"crates/types/"]
"crates/types/",
"examples/info-operator-service/"
]

resolver = "2"

Expand Down Expand Up @@ -62,6 +64,7 @@ eigensdk-metrics-collectors-rpc-calls = {version = "0.0.1-alpha",path = "crates/
eigensdk-services-avsregistry = {path = "crates/services/avsregistry"}
eigensdk-services-bls_aggregation = {path = "crates/services/bls_aggregation"}
eigensdk-services-operatorsinfo = {path = "crates/services/operatorsinfo"}
info-operator-service = {path = "examples/info-operator-service"}
tokio = {version = "1.37.0" , features = ["test-util", "full","sync"] }
futures-util = "0.3.30"
thiserror = "1.0"
Expand Down
78 changes: 45 additions & 33 deletions crates/chainio/clients/avsregistry/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use alloy_primitives::{Address, Bytes, FixedBytes, B256, U256};
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types::Filter;
use alloy_sol_types::sol;
use ark_ff::Zero;
use eigensdk_types::operator::{bitmap_to_quorum_ids, BLSApkRegistry, OperatorPubKeys};
use num_bigint::BigInt;
use std::collections::HashMap;
Expand Down Expand Up @@ -36,7 +37,6 @@ sol!(
"../../../../crates/contracts/bindings/utils/json/OperatorStateRetriever.json"
);

use BLSApkRegistry::{G1Point, G2Point};
/// Avs Registry chainreader
#[derive(Debug, Clone)]
pub struct AvsRegistryChainReader {
Expand All @@ -52,7 +52,7 @@ trait AvsRegistryReader {
}

impl AvsRegistryChainReader {
fn new(
pub fn new(
registry_coordinator_addr: Address,
bls_apk_registry_addr: Address,
operator_state_retriever: Address,
Expand Down Expand Up @@ -415,44 +415,59 @@ impl AvsRegistryChainReader {
pub async fn query_existing_registered_operator_pub_keys(
&self,
start_block: u64,
stop_block: u64,
mut stop_block: u64,
) -> Result<(Vec<Address>, Vec<OperatorPubKeys>), Box<dyn std::error::Error>> {
let provider = ProviderBuilder::new()
.with_recommended_fillers()
.on_builtin(&self.provider)
.await?;

let filter = Filter::new()
.select(start_block..stop_block)
.event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))")
.address(self.bls_apk_registry_addr);

let logs = provider.get_logs(&filter).await?;

debug!(transactionLogs = ?logs, "avsRegistryChainReader.QueryExistingRegisteredOperatorPubKeys");
let query_block_range = 1024;
let current_block_number = provider.get_block_number().await?;
if stop_block.is_zero() {
stop_block = current_block_number;
}
println!("start block :{}", start_block);
println!("stop block {}", stop_block);
let mut i = start_block;
let mut operator_addresses: Vec<Address> = vec![];
let mut operator_pub_keys: Vec<OperatorPubKeys> = vec![];
while i <= stop_block {
let mut to_block = i + (query_block_range - 1);
if to_block > stop_block {
to_block = stop_block;
}
println!("to block{}", to_block);
println!("bls apk address :{}", self.bls_apk_registry_addr);
let filter = Filter::new()
.select(i..to_block)
.event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))")
.address(self.bls_apk_registry_addr);

let logs = provider.get_logs(&filter).await?;
println!("logs length {:?}", logs.len());
debug!(transactionLogs = ?logs, "avsRegistryChainReader.QueryExistingRegisteredOperatorPubKeys");

for (i, v_log) in logs.iter().enumerate() {
let pub_key_reg_option = v_log
.log_decode::<BLSApkRegistry::NewPubkeyRegistration>()
.ok();
if let Some(pub_key_reg) = pub_key_reg_option {
let data = pub_key_reg.data();
let operator_addr = data.operator;
operator_addresses.push(operator_addr);
let g1_pub_key = data.pubkeyG1.clone();
let g2_pub_key = data.pubkeyG2.clone();

let operator_pub_key = OperatorPubKeys {
g1_pub_key: g1_pub_key,
g2_pub_key: g2_pub_key,
};

operator_pub_keys.push(operator_pub_key);
for (_, v_log) in logs.iter().enumerate() {
let pub_key_reg_option = v_log
.log_decode::<BLSApkRegistry::NewPubkeyRegistration>()
.ok();
if let Some(pub_key_reg) = pub_key_reg_option {
let data = pub_key_reg.data();
let operator_addr = data.operator;
operator_addresses.push(operator_addr);
let g1_pub_key = data.pubkeyG1.clone();
let g2_pub_key = data.pubkeyG2.clone();

let operator_pub_key = OperatorPubKeys {
g1_pub_key: g1_pub_key,
g2_pub_key: g2_pub_key,
};

operator_pub_keys.push(operator_pub_key);
}
}
i += 1024;
}

Ok((operator_addresses, operator_pub_keys))
}

Expand All @@ -470,9 +485,6 @@ impl AvsRegistryChainReader {

let query_block_range = 10000;

let contract_registry_coordinator =
RegistryCoordinator::new(self.registry_coordinator_addr, &provider);

let mut i = start_block;

while i <= stop_block {
Expand Down
12 changes: 4 additions & 8 deletions crates/chainio/clients/avsregistry/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,17 @@ use BLSApkRegistry::{BLSApkRegistryEvents, BLSApkRegistryInstance, NewPubkeyRegi
/// AvsRegistry Chain Subscriber struct
#[derive(Debug)]
pub struct AvsRegistryChainSubscriber {
bls_apk_registry: BLSApkRegistryEvents,
provider: String,
}

impl AvsRegistryChainSubscriber {
fn new(bls_apk_registry: BLSApkRegistryEvents, provider: String) -> Self {
return AvsRegistryChainSubscriber {
bls_apk_registry: bls_apk_registry,
provider,
};
pub fn new(provider: String) -> Self {
return AvsRegistryChainSubscriber { provider };
}

async fn build_avs_registry_chain_reader(
pub async fn build(
&self,
bls_apk_registry_addr: Address,
provider: String,
) -> Result<
BLSApkRegistryInstance<
BoxTransport,
Expand All @@ -60,6 +55,7 @@ impl AvsRegistryChainSubscriber {
return Ok(bls_apk_reg);
}

/// Utility function that returns new pubkey registration filter
pub async fn get_new_pub_key_registration_filter<'a>(
&self,
) -> Result<Filter, Box<dyn std::error::Error>> {
Expand Down
5 changes: 4 additions & 1 deletion crates/services/operatorsinfo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ alloy-primitives.workspace = true
alloy-provider.workspace = true
alloy-transport-ws.workspace = true
futures-util.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing.workspace = true
anyhow = "1.0.83"
alloy-rpc-types.workspace = true
103 changes: 48 additions & 55 deletions crates/services/operatorsinfo/src/operatorsinfo_inmemory.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
use alloy_sol_types::sol;
use eigensdk_client_avsregistry::{
reader::AvsRegistryChainReader, subscriber::AvsRegistryChainSubscriber,
};

// use eigensdk_types::{G1Point,G2Point};
use alloy_primitives::Address;
use alloy_provider::{Provider, ProviderBuilder};
use alloy_rpc_types::Filter;
use alloy_transport_ws::WsConnect;
use anyhow::Result;
use eigensdk_client_avsregistry::{
reader::AvsRegistryChainReader, subscriber::AvsRegistryChainSubscriber,
};
use eigensdk_types::operator::BLSApkRegistry::{self, G1Point, G2Point};
use eigensdk_types::operator::{operator_id_from_g1_pub_key, OperatorPubKeys};
use eyre::Result;
use futures_util::{stream, StreamExt};
use futures_util::StreamExt;
use std::collections::HashMap;
use std::error::Error as StdError;
use tokio::sync::{
mpsc,
mpsc::UnboundedSender,
Expand All @@ -32,14 +31,15 @@ enum OperatorsInfoMessage {
Get(Address, Sender<Option<OperatorPubKeys>>),
}

type BoxedError = Box<dyn StdError + Send + Sync>;

impl OperatorInfoServiceInMemory {
pub async fn new(
avs_registry_subscriber: AvsRegistryChainSubscriber,
avs_registry_chain_reader: AvsRegistryChainReader,
web_socket: String,
) -> Self {
let (pubkeys_tx, mut pubkeys_rx) = mpsc::unbounded_channel();

let mut operator_info_data = HashMap::new();

let mut operator_addr_to_id = HashMap::new();
Expand Down Expand Up @@ -71,53 +71,44 @@ impl OperatorInfoServiceInMemory {
}
}

#[tokio::main]
pub async fn start_service(&self) -> Result<()> {
pub async fn start_service(&self, start_block: u64, end_block: u64) -> Result<()> {
// query past operator registrations
self.query_past_registered_operator_events_and_fill_db()
self.query_past_registered_operator_events_and_fill_db(start_block, end_block)
.await;

let filter_result = self
.avs_registry_subscriber
.get_new_pub_key_registration_filter()
.await;

match filter_result {
Ok(filter) => {
let ws = WsConnect::new(&self.ws);
let provider = ProviderBuilder::new().on_ws(ws).await?;

let mut subcription_new_operator_registration_stream =
provider.subscribe_logs(&filter).await?;
let mut stream = subcription_new_operator_registration_stream.into_stream();
while let Some(log) = stream.next().await {
let data = log
.log_decode::<BLSApkRegistry::NewPubkeyRegistration>()
.ok();

if let Some(new_pub_key_event) = data {
let event_data = new_pub_key_event.data();
let operator_pub_key = OperatorPubKeys {
g1_pub_key: G1Point {
X: event_data.pubkeyG1.X,
Y: event_data.pubkeyG1.Y,
},
g2_pub_key: G2Point {
X: event_data.pubkeyG2.X,
Y: event_data.pubkeyG2.Y,
},
};
// send message
let _ = self.pub_keys.send(OperatorsInfoMessage::InsertOperatorInfo(
event_data.operator,
operator_pub_key,
));
}
}
let ws = WsConnect::new(&self.ws);
let provider = ProviderBuilder::new().on_ws(ws).await?;
let current_block_number = provider.get_block_number().await?;
let filter = Filter::new()
.event("NewPubkeyRegistration(address,(uint256,uint256),(uint256[2],uint256[2]))")
.from_block(current_block_number);

let subcription_new_operator_registration_stream = provider.subscribe_logs(&filter).await?;
let mut stream = subcription_new_operator_registration_stream.into_stream();
while let Some(log) = stream.next().await {
let data = log
.log_decode::<BLSApkRegistry::NewPubkeyRegistration>()
.ok();

if let Some(new_pub_key_event) = data {
let event_data = new_pub_key_event.data();
let operator_pub_key = OperatorPubKeys {
g1_pub_key: G1Point {
X: event_data.pubkeyG1.X,
Y: event_data.pubkeyG1.Y,
},
g2_pub_key: G2Point {
X: event_data.pubkeyG2.X,
Y: event_data.pubkeyG2.Y,
},
};
// send message
let _ = self.pub_keys.send(OperatorsInfoMessage::InsertOperatorInfo(
event_data.operator,
operator_pub_key,
));
}
Err(_) => {}
}

Ok(())
}

Expand All @@ -129,15 +120,17 @@ impl OperatorInfoServiceInMemory {
responder_rx.await.unwrap_or(None)
}

pub async fn query_past_registered_operator_events_and_fill_db(&self) {
pub async fn query_past_registered_operator_events_and_fill_db(
&self,
start_block: u64,
end_block: u64,
) {
let (operator_address, operator_pub_keys) = self
.avs_registry_reader
.query_existing_registered_operator_pub_keys(0, 0)
.query_existing_registered_operator_pub_keys(start_block, end_block)
.await
.unwrap();

for (i, address) in operator_address.iter().enumerate() {
// let mut pub_keys = map.lock().unwrap();
let message =
OperatorsInfoMessage::InsertOperatorInfo(*address, operator_pub_keys[i].clone());
let _ = self.pub_keys.send(message);
Expand Down
8 changes: 8 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Examples

Examples demonstrating how to interact with Eigen layer contracts using eigensdk-rs crates.


## OperatorsInfo


Loading

0 comments on commit 418ede4

Please sign in to comment.