From 91f68011f68c151260ce213984ed09da173fb320 Mon Sep 17 00:00:00 2001 From: Nick <49566599+nick199910@users.noreply.github.com> Date: Mon, 29 Jul 2024 10:00:53 +0800 Subject: [PATCH] support ipc request (#528) --- Cargo.lock | 29 +++-- Cargo.toml | 1 + src/evm/onchain/endpoints.rs | 218 ++++++++++++++++++++++++++++++----- src/evm/onchain/mod.rs | 2 + 4 files changed, 215 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a5f6f952..6edf82c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7349,6 +7349,18 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "mio" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys 0.52.0", +] + [[package]] name = "miow" version = "0.3.7" @@ -13166,22 +13178,21 @@ checksum = "c7c4ceeeca15c8384bbc3e011dbd8fccb7f068a440b752b7d9b32ceb0ca0e2e8" [[package]] name = "tokio" -version = "1.37.0" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", "libc", - "mio 0.8.11", - "num_cpus", + "mio 1.0.1", "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2 0.5.6", - "tokio-macros 2.2.0", + "tokio-macros 2.4.0", "tracing", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -13206,9 +13217,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.2.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2 1.0.79", "quote 1.0.35", @@ -15548,7 +15559,7 @@ dependencies = [ "tinyvec_macros", "tokio", "tokio-io-timeout", - "tokio-macros 2.2.0", + "tokio-macros 2.4.0", "tokio-retry", "tokio-rustls 0.23.4", "tokio-rustls 0.24.1", diff --git a/Cargo.toml b/Cargo.toml index e8eebc48..a69f0a34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -128,3 +128,4 @@ tracing-subscriber = "0.3" colored = "2.0" evmole = "0.3.7" semver = "1.0.22" + diff --git a/src/evm/onchain/endpoints.rs b/src/evm/onchain/endpoints.rs index 54868a17..0a6331f6 100644 --- a/src/evm/onchain/endpoints.rs +++ b/src/evm/onchain/endpoints.rs @@ -3,15 +3,16 @@ use std::{ env, fmt::Debug, hash::{Hash, Hasher}, + io::{Read, Write}, + os::unix::net::UnixStream, panic, str::FromStr, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; use anyhow::{anyhow, Result}; use bytes::Bytes; -use itertools::Itertools; use reqwest::{blocking, header::HeaderMap}; use retry::{delay::Fixed, retry_with_index, OperationResult}; use revm_interpreter::analysis::to_analysed; @@ -92,21 +93,40 @@ impl FromStr for Chain { impl Chain { pub fn new_with_rpc_url(rpc_url: &str) -> Result { let client = blocking::Client::new(); - let body = json!({"method":"eth_chainId","params":[],"id":1,"jsonrpc":"2.0"}); - let resp: Value = client - .post(rpc_url) - .header("Content-Type", "application/json") - .body(body.to_string()) - .send()? - .json()?; + let body = json!({ + "method": "eth_chainId", + "params": [], + "id": 1, + "jsonrpc": "2.0" + }) + .to_string(); + + let resp: Value = if rpc_url.starts_with("http://") || rpc_url.starts_with("https://") { + // HTTP request + let response = client + .post(rpc_url) + .header("Content-Type", "application/json") + .body(body.clone()) + .send()?; + response.json::()? + } else if rpc_url.starts_with("/") || rpc_url.starts_with("./") || rpc_url.starts_with("../") { + // IPC request + let mut stream = UnixStream::connect(rpc_url)?; + stream.write_all(body.as_bytes())?; + + let mut response = String::new(); + stream.read_to_string(&mut response)?; + serde_json::from_str(&response)? + } else { + return Err(anyhow!("Unsupported URL scheme: {}", rpc_url)); + }; let chain_id = resp .get("result") .and_then(|result| result.as_str()) .and_then(|result| u64::from_str_radix(result.trim_start_matches("0x"), 16).ok()) - .ok_or_else(|| anyhow!("Unknown chain id: {}", rpc_url))?; + .ok_or_else(|| anyhow!("Failed to parse chain id from response: {}", rpc_url))?; - // Use rpc_url instead of the default one env::set_var("ETH_RPC_URL", rpc_url); Ok(match chain_id { @@ -700,18 +720,91 @@ impl OnChainConfig { abi } - fn _request(&self, method: String, params: String) -> Option { + pub fn _request(&self, method: String, params: String) -> Option { let data = format!( "{{\"jsonrpc\":\"2.0\", \"method\": \"{}\", \"params\": {}, \"id\": {}}}", method, params, self.chain_id ); - self.post(self.endpoint_url.clone(), data) - .and_then(|resp| serde_json::from_str(&resp).ok()) - .and_then(|json: Value| json.get("result").cloned()) - .or_else(|| { - error!("failed to fetch from {}", self.endpoint_url); - None - }) + + // Handling HTTP request + if self.endpoint_url.starts_with("http://") || self.endpoint_url.starts_with("https://") { + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .expect("Failed to create HTTP client"); + + return client + .post(&self.endpoint_url) + .header("Content-Type", "application/json") + .body(data) + .send() + .ok() + .and_then(|resp| resp.text().ok()) + .and_then(|resp| serde_json::from_str(&resp).ok()) + .and_then(|json: Value| json.get("result").cloned()); + } + // Handling IPC request + else if self.endpoint_url.starts_with("/") || + self.endpoint_url.starts_with("./") || + self.endpoint_url.starts_with("../") + { + match UnixStream::connect(&self.endpoint_url) { + Ok(mut socket) => { + socket + .set_read_timeout(Some(Duration::from_secs(10))) + .expect("Failed to set read timeout"); + socket + .set_write_timeout(Some(Duration::from_secs(5))) + .expect("Failed to set write timeout"); + + if let Err(e) = socket.write_all(data.as_bytes()) { + error!("Failed to write to IPC stream: {}", e); + return None; + } + + let mut response = String::new(); + let mut buffer = [0; 4096]; + let timeout = Duration::from_secs(10); + let start_time = Instant::now(); + + while start_time.elapsed() < timeout { + match socket.read(&mut buffer) { + Ok(0) => { + error!("No data read from IPC stream; the stream might have been closed."); + return None; + } + Ok(n) => { + response.push_str(&String::from_utf8_lossy(&buffer[..n])); + if response.contains("\n") { + break; + } + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => { + error!("Failed to read from IPC stream: {}", e); + return None; + } + } + } + + if start_time.elapsed() >= timeout { + error!("Timeout reached while reading from the IPC stream."); + return None; + } + + serde_json::from_str(&response) + .ok() + .and_then(|json: Value| json.get("result").cloned()) + } + Err(e) => { + error!("IPC connection failed: {}", e); + return None; + } + } + } else { + error!("Unsupported URL scheme: {}", self.endpoint_url); + None + } } fn _request_with_id(&self, method: String, params: String, id: u8) -> Option { @@ -719,13 +812,86 @@ impl OnChainConfig { "{{\"jsonrpc\":\"2.0\", \"method\": \"{}\", \"params\": {}, \"id\": {}}}", method, params, id ); - self.post(self.endpoint_url.clone(), data) - .and_then(|resp| serde_json::from_str(&resp).ok()) - .and_then(|json: Value| json.get("result").cloned()) - .or_else(|| { - error!("failed to fetch from {}", self.endpoint_url); - None - }) + + // Handling HTTP request + if self.endpoint_url.starts_with("http://") || self.endpoint_url.starts_with("https://") { + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(10)) + .build() + .expect("Failed to create HTTP client"); + + return client + .post(&self.endpoint_url) + .header("Content-Type", "application/json") + .body(data) + .send() + .ok() + .and_then(|resp| resp.text().ok()) + .and_then(|resp| serde_json::from_str(&resp).ok()) + .and_then(|json: Value| json.get("result").cloned()); + } + // Handling IPC request + else if self.endpoint_url.starts_with("/") || + self.endpoint_url.starts_with("./") || + self.endpoint_url.starts_with("../") + { + match UnixStream::connect(&self.endpoint_url) { + Ok(mut socket) => { + socket + .set_read_timeout(Some(Duration::from_secs(10))) + .expect("Failed to set read timeout"); + socket + .set_write_timeout(Some(Duration::from_secs(5))) + .expect("Failed to set write timeout"); + + if let Err(e) = socket.write_all(data.as_bytes()) { + error!("Failed to write to IPC stream: {}", e); + return None; + } + + let mut response = String::new(); + let mut buffer = [0; 4096]; + let timeout = Duration::from_secs(10); + let start_time = Instant::now(); + + while start_time.elapsed() < timeout { + match socket.read(&mut buffer) { + Ok(0) => { + error!("No data read from IPC stream; the stream might have been closed."); + return None; + } + Ok(n) => { + response.push_str(&String::from_utf8_lossy(&buffer[..n])); + if response.contains("\n") { + break; + } + } + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue, + Err(e) => { + error!("Failed to read from IPC stream: {}", e); + return None; + } + } + } + + if start_time.elapsed() >= timeout { + error!("Timeout reached while reading from the IPC stream."); + return None; + } + + serde_json::from_str(&response) + .ok() + .and_then(|json: Value| json.get("result").cloned()) + } + Err(e) => { + error!("IPC connection failed: {}", e); + return None; + } + } + } else { + error!("Unsupported URL scheme: {}", self.endpoint_url); + None + } } pub fn get_balance(&mut self, address: EVMAddress) -> EVMU256 { diff --git a/src/evm/onchain/mod.rs b/src/evm/onchain/mod.rs index 37c960eb..19c5107e 100644 --- a/src/evm/onchain/mod.rs +++ b/src/evm/onchain/mod.rs @@ -211,6 +211,8 @@ where }}; () => {}; } + + // todo! get storage data from reth host.next_slot = match self.storage_fetching { StorageFetchingMode::Dump => { load_data!(fetch_storage_dump, storage_dump, slot_idx)