Skip to content

Commit

Permalink
feat: small wasm support
Browse files Browse the repository at this point in the history
  • Loading branch information
driftluo committed Oct 22, 2024
1 parent 9dd17ab commit 7745dd2
Show file tree
Hide file tree
Showing 39 changed files with 643 additions and 321 deletions.
153 changes: 122 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ ckb-app-config = { path = "../util/app-config", version = "= 0.119.0-pre" }
ckb-logger = { path = "../util/logger", version = "= 0.119.0-pre" }
ckb-error = { path = "../error", version = "= 0.119.0-pre" }
libc = "0.2"
rocksdb = { package = "ckb-rocksdb", version ="=0.21.1", features = ["snappy"], default-features = false }
rocksdb = { package = "ckb-rocksdb", version = "=0.21.1", features = [
"snappy",
], default-features = false }
ckb-db-schema = { path = "../db-schema", version = "= 0.119.0-pre" }

[dev-dependencies]
Expand Down
4 changes: 3 additions & 1 deletion error/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ repository = "https://github.com/nervosnetwork/ckb"
thiserror = "1.0.22"
anyhow = "1.0.34"
ckb-occupied-capacity = { path = "../util/occupied-capacity", version = "= 0.119.0-pre" }
derive_more = { version = "0.99.0", default-features = false, features = ["display"] }
derive_more = { version = "1", default-features = false, features = [
"display",
] }
2 changes: 1 addition & 1 deletion error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl fmt::Display for AnyError {

impl fmt::Debug for AnyError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
Display::fmt(&self.0, f)
}
}
/// Return whether the error's kind is `InternalErrorKind::Database`
Expand Down
16 changes: 14 additions & 2 deletions network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tokio-util = { version = "0.7", features = ["codec"] }
futures = "0.3"
ckb-systemtime = { path = "../util/systemtime", version = "= 0.119.0-pre" }
lazy_static = { version = "1.3.0", optional = true }
bs58 = { version = "0.4.0", optional = true }
bs58 = { version = "0.5.0", optional = true }
sentry = { version = "0.26.0", optional = true }
faster-hex = { version = "0.6", optional = true }
ckb-hash = { path = "../util/hash", version = "= 0.119.0-pre" }
Expand All @@ -34,15 +34,26 @@ ipnetwork = "0.18"
serde_json = "1.0"
bloom-filters = "0.1"
ckb-spawn = { path = "../util/spawn", version = "= 0.119.0-pre" }
socket2 = "0.5"
bitflags = "1.0"
p2p = { version = "0.6.1", package = "tentacle", default-features = false }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
p2p = { version = "0.6.1", package = "tentacle", features = [
"upnp",
"parking_lot",
"openssl-vendored",
"ws",
] }
socket2 = "0.5"

[target.'cfg(target_family = "wasm")'.dependencies]
p2p = { version = "0.6.1", package = "tentacle", default-features = false, features = [
"wasm-timer",
] }

[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
web-time = "1.1.0"

[features]
with_sentry = ["sentry"]
with_dns_seeding = [
Expand All @@ -63,6 +74,7 @@ once_cell = "1.8.0"
ckb-systemtime = { path = "../util/systemtime", version = "= 0.119.0-pre", features = [
"enable_faketime",
] }
ckb-app-config = { path = "../util/app-config", version = "= 0.119.0-pre" }

[[bench]]
name = "peer_store"
Expand Down
4 changes: 2 additions & 2 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pub use crate::{
peer_registry::PeerRegistry,
peer_store::Score,
protocols::{
identify::Flags, support_protocols::SupportProtocols, CKBProtocol, CKBProtocolContext,
CKBProtocolHandler, PeerIndex,
identify::Flags, support_protocols::SupportProtocols, BoxedCKBProtocolContext, CKBProtocol,
CKBProtocolContext, CKBProtocolHandler, PeerIndex,
},
};
pub use p2p::{
Expand Down
112 changes: 65 additions & 47 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use rand::prelude::IteratorRandom;
#[cfg(feature = "with_sentry")]
use sentry::{capture_message, with_scope, Level};
use std::sync::mpsc;
#[cfg(not(target_family = "wasm"))]
use std::time::{Duration, Instant};
use std::{
borrow::Cow,
cmp::max,
Expand All @@ -55,10 +57,11 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
thread, usize,
};
use tokio::{self, sync::oneshot};
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use web_time::{Duration, Instant};

const P2P_SEND_TIMEOUT: Duration = Duration::from_secs(6);
const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -92,6 +95,7 @@ pub struct NetworkState {
impl NetworkState {
/// Init from config
pub fn from_config(config: NetworkConfig) -> Result<NetworkState, Error> {
#[cfg(not(target_family = "wasm"))]
config.create_dir_if_not_exists()?;
let local_private_key = config.fetch_private_key()?;
let local_peer_id = local_private_key.peer_id();
Expand Down Expand Up @@ -890,38 +894,23 @@ impl NetworkService {
};
service_builder = service_builder
.handshake_type(network_state.local_private_key.clone().into())
.upnp(config.upnp)
.yamux_config(yamux_config)
.forever(true)
.max_connection_number(1024)
.set_send_buffer_size(config.max_send_buffer())
.set_channel_size(config.channel_size())
.timeout(Duration::from_secs(5));

#[cfg(not(target_family = "wasm"))]
{
service_builder = service_builder.upnp(config.upnp);
}

#[cfg(target_os = "linux")]
let p2p_service = {
if config.reuse_port_on_linux {
let iter = config.listen_addresses.iter();

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum TransportType {
Ws,
Tcp,
}

fn find_type(addr: &Multiaddr) -> TransportType {
let mut iter = addr.iter();

iter.find_map(|proto| {
if let p2p::multiaddr::Protocol::Ws = proto {
Some(TransportType::Ws)
} else {
None
}
})
.unwrap_or(TransportType::Tcp)
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum BindType {
None,
Expand All @@ -947,38 +936,45 @@ impl NetworkService {
}

let mut init = BindType::None;
for addr in iter {
for multi_addr in iter {
if init.is_ready() {
break;
}
match find_type(addr) {
match find_type(multi_addr) {
// wait ckb enable ws support
TransportType::Ws => (),
TransportType::Tcp => {
TransportType::Tcp | TransportType::Ws => {
// only bind once
if matches!(init, BindType::Tcp) {
if matches!(init, BindType::Tcp) || matches!(init, BindType::Ws) {
continue;
}
if let Some(addr) = multiaddr_to_socketaddr(addr) {
if let Some(addr) = multiaddr_to_socketaddr(multi_addr) {
use p2p::service::TcpSocket;
let domain = socket2::Domain::for_address(addr);
service_builder =
service_builder.tcp_config(move |socket: TcpSocket| {
let socket_ref = socket2::SockRef::from(&socket);
#[cfg(all(
unix,
not(target_os = "solaris"),
not(target_os = "illumos")
))]
socket_ref.set_reuse_port(true)?;

socket_ref.set_reuse_address(true)?;
if socket_ref.domain()? == domain {
socket_ref.bind(&addr.into())?;
}
Ok(socket)
});
init.transform(TransportType::Tcp)
let bind_fn = move |socket: TcpSocket| {
let socket_ref = socket2::SockRef::from(&socket);
#[cfg(all(
unix,
not(target_os = "solaris"),
not(target_os = "illumos")
))]
socket_ref.set_reuse_port(true)?;

socket_ref.set_reuse_address(true)?;
if socket_ref.domain()? == domain {
socket_ref.bind(&addr.into())?;
}
Ok(socket)
};
service_builder = match find_type(multi_addr) {
TransportType::Tcp => {
init.transform(TransportType::Tcp);
service_builder.tcp_config(bind_fn)
}
TransportType::Ws => {
init.transform(TransportType::Ws);
service_builder.tcp_config_on_ws(bind_fn)
}
};
}
}
}
Expand Down Expand Up @@ -1094,11 +1090,13 @@ impl NetworkService {
.unzip();

let receiver: CancellationToken = new_tokio_exit_rx();
#[cfg(not(target_family = "wasm"))]
let (start_sender, start_receiver) = mpsc::channel();
{
let network_state = Arc::clone(&network_state);
let p2p_control: ServiceAsyncControl = p2p_control.clone().into();
handle.spawn_task(async move {
#[cfg(not(target_family = "wasm"))]
for addr in &config.listen_addresses {
match p2p_service.listen(addr.to_owned()).await {
Ok(listen_address) => {
Expand All @@ -1121,8 +1119,9 @@ impl NetworkService {
}
};
}
#[cfg(not(target_family = "wasm"))]
start_sender.send(Ok(())).unwrap();
tokio::spawn(async move { p2p_service.run().await });
p2p::runtime::spawn(async move { p2p_service.run().await });
tokio::select! {
_ = receiver.cancelled() => {
info!("NetworkService receive exit signal, start shutdown...");
Expand Down Expand Up @@ -1150,7 +1149,7 @@ impl NetworkService {
}
});
}

#[cfg(not(target_family = "wasm"))]
if let Ok(Err(e)) = start_receiver.recv() {
return Err(e);
}
Expand Down Expand Up @@ -1419,3 +1418,22 @@ pub(crate) async fn async_disconnect_with_message(
}
control.disconnect(peer_index).await
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum TransportType {
Ws,
Tcp,
}

pub fn find_type(addr: &Multiaddr) -> TransportType {
let mut iter = addr.iter();

iter.find_map(|proto| {
if let p2p::multiaddr::Protocol::Ws = proto {
Some(TransportType::Ws)
} else {
None
}
})
.unwrap_or(TransportType::Tcp)
}
3 changes: 3 additions & 0 deletions network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
};
use p2p::SessionId;
use std::collections::HashMap;
#[cfg(not(target_family = "wasm"))]
use std::time::{Duration, Instant};
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use web_time::{Duration, Instant};

/// Peer info from identify protocol message
#[derive(Clone, Debug)]
Expand Down
6 changes: 5 additions & 1 deletion network/src/peer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use p2p::{multiaddr::Multiaddr, SessionId};
use rand::seq::SliceRandom;
use rand::thread_rng;
use std::collections::{HashMap, HashSet};
#[cfg(not(target_family = "wasm"))]
use std::time::Instant;
#[cfg(all(target_family = "wasm", target_os = "unknown"))]
use web_time::Instant;

pub(crate) const EVICTION_PROTECT_PEERS: usize = 8;

Expand Down Expand Up @@ -146,7 +150,7 @@ impl PeerRegistry {
&mut candidate_peers,
EVICTION_PROTECT_PEERS,
|peer1, peer2| {
let now = std::time::Instant::now();
let now = Instant::now();
let peer1_last_message = peer1
.last_ping_protocol_message_received_at
.map(|t| now.saturating_duration_since(t).as_secs())
Expand Down
Loading

0 comments on commit 7745dd2

Please sign in to comment.