Skip to content

Commit

Permalink
support ipc request (#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
nick199910 authored Jul 29, 2024
1 parent 8fdcdf7 commit 91f6801
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 35 deletions.
29 changes: 20 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,4 @@ tracing-subscriber = "0.3"
colored = "2.0"
evmole = "0.3.7"
semver = "1.0.22"

218 changes: 192 additions & 26 deletions src/evm/onchain/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{
env,
fmt::Debug,
hash::{Hash, Hasher},
io::{Read, Write},
os::unix::net::UnixStream,
panic,
str::FromStr,
sync::Arc,
time::Duration,
time::{Duration, Instant},
};

use anyhow::{anyhow, Result};
use bytes::Bytes;
use itertools::Itertools;
use reqwest::{blocking, header::HeaderMap};
use retry::{delay::Fixed, retry_with_index, OperationResult};
use revm_interpreter::analysis::to_analysed;
Expand Down Expand Up @@ -92,21 +93,40 @@ impl FromStr for Chain {
impl Chain {
pub fn new_with_rpc_url(rpc_url: &str) -> Result<Self> {
let client = blocking::Client::new();
let body = json!({"method":"eth_chainId","params":[],"id":1,"jsonrpc":"2.0"});
let resp: Value = client
.post(rpc_url)
.header("Content-Type", "application/json")
.body(body.to_string())
.send()?
.json()?;
let body = json!({
"method": "eth_chainId",
"params": [],
"id": 1,
"jsonrpc": "2.0"
})
.to_string();

let resp: Value = if rpc_url.starts_with("http://") || rpc_url.starts_with("https://") {
// HTTP request
let response = client
.post(rpc_url)
.header("Content-Type", "application/json")
.body(body.clone())
.send()?;
response.json::<Value>()?
} else if rpc_url.starts_with("/") || rpc_url.starts_with("./") || rpc_url.starts_with("../") {
// IPC request
let mut stream = UnixStream::connect(rpc_url)?;
stream.write_all(body.as_bytes())?;

let mut response = String::new();
stream.read_to_string(&mut response)?;
serde_json::from_str(&response)?
} else {
return Err(anyhow!("Unsupported URL scheme: {}", rpc_url));
};

let chain_id = resp
.get("result")
.and_then(|result| result.as_str())
.and_then(|result| u64::from_str_radix(result.trim_start_matches("0x"), 16).ok())
.ok_or_else(|| anyhow!("Unknown chain id: {}", rpc_url))?;
.ok_or_else(|| anyhow!("Failed to parse chain id from response: {}", rpc_url))?;

// Use rpc_url instead of the default one
env::set_var("ETH_RPC_URL", rpc_url);

Ok(match chain_id {
Expand Down Expand Up @@ -700,32 +720,178 @@ impl OnChainConfig {
abi
}

fn _request(&self, method: String, params: String) -> Option<Value> {
pub fn _request(&self, method: String, params: String) -> Option<Value> {
let data = format!(
"{{\"jsonrpc\":\"2.0\", \"method\": \"{}\", \"params\": {}, \"id\": {}}}",
method, params, self.chain_id
);
self.post(self.endpoint_url.clone(), data)
.and_then(|resp| serde_json::from_str(&resp).ok())
.and_then(|json: Value| json.get("result").cloned())
.or_else(|| {
error!("failed to fetch from {}", self.endpoint_url);
None
})

// Handling HTTP request
if self.endpoint_url.starts_with("http://") || self.endpoint_url.starts_with("https://") {
let client = reqwest::blocking::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");

return client
.post(&self.endpoint_url)
.header("Content-Type", "application/json")
.body(data)
.send()
.ok()
.and_then(|resp| resp.text().ok())
.and_then(|resp| serde_json::from_str(&resp).ok())
.and_then(|json: Value| json.get("result").cloned());
}
// Handling IPC request
else if self.endpoint_url.starts_with("/") ||
self.endpoint_url.starts_with("./") ||
self.endpoint_url.starts_with("../")
{
match UnixStream::connect(&self.endpoint_url) {
Ok(mut socket) => {
socket
.set_read_timeout(Some(Duration::from_secs(10)))
.expect("Failed to set read timeout");
socket
.set_write_timeout(Some(Duration::from_secs(5)))
.expect("Failed to set write timeout");

if let Err(e) = socket.write_all(data.as_bytes()) {
error!("Failed to write to IPC stream: {}", e);
return None;
}

let mut response = String::new();
let mut buffer = [0; 4096];
let timeout = Duration::from_secs(10);
let start_time = Instant::now();

while start_time.elapsed() < timeout {
match socket.read(&mut buffer) {
Ok(0) => {
error!("No data read from IPC stream; the stream might have been closed.");
return None;
}
Ok(n) => {
response.push_str(&String::from_utf8_lossy(&buffer[..n]));
if response.contains("\n") {
break;
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
Err(e) => {
error!("Failed to read from IPC stream: {}", e);
return None;
}
}
}

if start_time.elapsed() >= timeout {
error!("Timeout reached while reading from the IPC stream.");
return None;
}

serde_json::from_str(&response)
.ok()
.and_then(|json: Value| json.get("result").cloned())
}
Err(e) => {
error!("IPC connection failed: {}", e);
return None;
}
}
} else {
error!("Unsupported URL scheme: {}", self.endpoint_url);
None
}
}

fn _request_with_id(&self, method: String, params: String, id: u8) -> Option<Value> {
let data = format!(
"{{\"jsonrpc\":\"2.0\", \"method\": \"{}\", \"params\": {}, \"id\": {}}}",
method, params, id
);
self.post(self.endpoint_url.clone(), data)
.and_then(|resp| serde_json::from_str(&resp).ok())
.and_then(|json: Value| json.get("result").cloned())
.or_else(|| {
error!("failed to fetch from {}", self.endpoint_url);
None
})

// Handling HTTP request
if self.endpoint_url.starts_with("http://") || self.endpoint_url.starts_with("https://") {
let client = reqwest::blocking::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");

return client
.post(&self.endpoint_url)
.header("Content-Type", "application/json")
.body(data)
.send()
.ok()
.and_then(|resp| resp.text().ok())
.and_then(|resp| serde_json::from_str(&resp).ok())
.and_then(|json: Value| json.get("result").cloned());
}
// Handling IPC request
else if self.endpoint_url.starts_with("/") ||
self.endpoint_url.starts_with("./") ||
self.endpoint_url.starts_with("../")
{
match UnixStream::connect(&self.endpoint_url) {
Ok(mut socket) => {
socket
.set_read_timeout(Some(Duration::from_secs(10)))
.expect("Failed to set read timeout");
socket
.set_write_timeout(Some(Duration::from_secs(5)))
.expect("Failed to set write timeout");

if let Err(e) = socket.write_all(data.as_bytes()) {
error!("Failed to write to IPC stream: {}", e);
return None;
}

let mut response = String::new();
let mut buffer = [0; 4096];
let timeout = Duration::from_secs(10);
let start_time = Instant::now();

while start_time.elapsed() < timeout {
match socket.read(&mut buffer) {
Ok(0) => {
error!("No data read from IPC stream; the stream might have been closed.");
return None;
}
Ok(n) => {
response.push_str(&String::from_utf8_lossy(&buffer[..n]));
if response.contains("\n") {
break;
}
}
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => continue,
Err(e) => {
error!("Failed to read from IPC stream: {}", e);
return None;
}
}
}

if start_time.elapsed() >= timeout {
error!("Timeout reached while reading from the IPC stream.");
return None;
}

serde_json::from_str(&response)
.ok()
.and_then(|json: Value| json.get("result").cloned())
}
Err(e) => {
error!("IPC connection failed: {}", e);
return None;
}
}
} else {
error!("Unsupported URL scheme: {}", self.endpoint_url);
None
}
}

pub fn get_balance(&mut self, address: EVMAddress) -> EVMU256 {
Expand Down
2 changes: 2 additions & 0 deletions src/evm/onchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ where
}};
() => {};
}

// todo! get storage data from reth
host.next_slot = match self.storage_fetching {
StorageFetchingMode::Dump => {
load_data!(fetch_storage_dump, storage_dump, slot_idx)
Expand Down

0 comments on commit 91f6801

Please sign in to comment.