From 9b7b85a5d8c85544193e5ed85429c0cb855d52d6 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Tue, 5 Mar 2019 16:35:11 +0800 Subject: [PATCH 01/17] Add identify protocol flatbuffers --- Cargo.toml | 2 +- identify/Cargo.toml | 10 + identify/src/gen_proto.sh | 4 + identify/src/lib.rs | 7 + identify/src/protocol.fbs | 22 ++ identify/src/protocol_generated.rs | 429 +++++++++++++++++++++++++++++ 6 files changed, 473 insertions(+), 1 deletion(-) create mode 100644 identify/Cargo.toml create mode 100755 identify/src/gen_proto.sh create mode 100644 identify/src/lib.rs create mode 100644 identify/src/protocol.fbs create mode 100644 identify/src/protocol_generated.rs diff --git a/Cargo.toml b/Cargo.toml index 610eb818..fb6daf09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,4 +35,4 @@ ping = { path = "ping", package = "tentacle-ping" } generic-channel = { version = "0.2.0", features = ["all"] } [workspace] -members = ["yamux", "secio", "discovery", "ping", "bench"] +members = ["yamux", "secio", "discovery", "identify", "ping", "bench"] diff --git a/identify/Cargo.toml b/identify/Cargo.toml new file mode 100644 index 00000000..7a0d19b5 --- /dev/null +++ b/identify/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "identify" +version = "0.1.0" +authors = ["Qian Linfeng "] +edition = "2018" + +[dependencies] +p2p = { path = "..", package = "tentacle" } +bytes = "0.4" +flatbuffers = "0.5.0" \ No newline at end of file diff --git a/identify/src/gen_proto.sh b/identify/src/gen_proto.sh new file mode 100755 index 00000000..e41621cc --- /dev/null +++ b/identify/src/gen_proto.sh @@ -0,0 +1,4 @@ +#! /bin/sh +# flatc version 1.10.0 + +flatc --rust protocol.fbs diff --git a/identify/src/lib.rs b/identify/src/lib.rs new file mode 100644 index 00000000..4eab2f5a --- /dev/null +++ b/identify/src/lib.rs @@ -0,0 +1,7 @@ + + +pub struct Identify { +} + +pub enum IdentifyMessage { +} diff --git a/identify/src/protocol.fbs b/identify/src/protocol.fbs new file mode 100644 index 00000000..768e0f84 --- /dev/null +++ b/identify/src/protocol.fbs @@ -0,0 +1,22 @@ +namespace P2P.Identify; + +union IdentifyPayload { + ListenAddrs, + ObservedAddr, +} + +table IdentifyMessage { + payload: IdentifyPayload; +} + +table ListenAddrs { + addrs: [Bytes]; +} + +table ObservedAddr { + addr: Bytes; +} + +table Bytes { + seq: [ubyte]; +} \ No newline at end of file diff --git a/identify/src/protocol_generated.rs b/identify/src/protocol_generated.rs new file mode 100644 index 00000000..89d5f02c --- /dev/null +++ b/identify/src/protocol_generated.rs @@ -0,0 +1,429 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#![allow(dead_code)] +#![allow(unused_imports)] +extern crate flatbuffers; + +pub mod p2p { + #![allow(dead_code)] + #![allow(unused_imports)] + + use std::mem; + use std::cmp::Ordering; + + extern crate flatbuffers; + use self::flatbuffers::EndianScalar; +pub mod identify { + #![allow(dead_code)] + #![allow(unused_imports)] + + use std::mem; + use std::cmp::Ordering; + + extern crate flatbuffers; + use self::flatbuffers::EndianScalar; + +#[allow(non_camel_case_types)] +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Debug)] +pub enum IdentifyPayload { + NONE = 0, + ListenAddrs = 1, + ObservedAddr = 2, + +} + +const ENUM_MIN_IDENTIFY_PAYLOAD: u8 = 0; +const ENUM_MAX_IDENTIFY_PAYLOAD: u8 = 2; + +impl<'a> flatbuffers::Follow<'a> for IdentifyPayload { + type Inner = Self; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + flatbuffers::read_scalar_at::(buf, loc) + } +} + +impl flatbuffers::EndianScalar for IdentifyPayload { + #[inline] + fn to_little_endian(self) -> Self { + let n = u8::to_le(self as u8); + let p = &n as *const u8 as *const IdentifyPayload; + unsafe { *p } + } + #[inline] + fn from_little_endian(self) -> Self { + let n = u8::from_le(self as u8); + let p = &n as *const u8 as *const IdentifyPayload; + unsafe { *p } + } +} + +impl flatbuffers::Push for IdentifyPayload { + type Output = IdentifyPayload; + #[inline] + fn push(&self, dst: &mut [u8], _rest: &[u8]) { + flatbuffers::emplace_scalar::(dst, *self); + } +} + +#[allow(non_camel_case_types)] +const ENUM_VALUES_IDENTIFY_PAYLOAD:[IdentifyPayload; 3] = [ + IdentifyPayload::NONE, + IdentifyPayload::ListenAddrs, + IdentifyPayload::ObservedAddr +]; + +#[allow(non_camel_case_types)] +const ENUM_NAMES_IDENTIFY_PAYLOAD:[&'static str; 3] = [ + "NONE", + "ListenAddrs", + "ObservedAddr" +]; + +pub fn enum_name_identify_payload(e: IdentifyPayload) -> &'static str { + let index: usize = e as usize; + ENUM_NAMES_IDENTIFY_PAYLOAD[index] +} + +pub struct IdentifyPayloadUnionTableOffset {} +pub enum IdentifyMessageOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct IdentifyMessage<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for IdentifyMessage<'a> { + type Inner = IdentifyMessage<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> IdentifyMessage<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + IdentifyMessage { + _tab: table, + } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args IdentifyMessageArgs) -> flatbuffers::WIPOffset> { + let mut builder = IdentifyMessageBuilder::new(_fbb); + if let Some(x) = args.payload { builder.add_payload(x); } + builder.add_payload_type(args.payload_type); + builder.finish() + } + + pub const VT_PAYLOAD_TYPE: flatbuffers::VOffsetT = 4; + pub const VT_PAYLOAD: flatbuffers::VOffsetT = 6; + + #[inline] + pub fn payload_type(&self) -> IdentifyPayload { + self._tab.get::(IdentifyMessage::VT_PAYLOAD_TYPE, Some(IdentifyPayload::NONE)).unwrap() + } + #[inline] + pub fn payload(&self) -> Option> { + self._tab.get::>>(IdentifyMessage::VT_PAYLOAD, None) + } + #[inline] + #[allow(non_snake_case)] + pub fn payload_as_listen_addrs(&'a self) -> Option { + if self.payload_type() == IdentifyPayload::ListenAddrs { + self.payload().map(|u| ListenAddrs::init_from_table(u)) + } else { + None + } + } + + #[inline] + #[allow(non_snake_case)] + pub fn payload_as_observed_addr(&'a self) -> Option { + if self.payload_type() == IdentifyPayload::ObservedAddr { + self.payload().map(|u| ObservedAddr::init_from_table(u)) + } else { + None + } + } + +} + +pub struct IdentifyMessageArgs { + pub payload_type: IdentifyPayload, + pub payload: Option>, +} +impl<'a> Default for IdentifyMessageArgs { + #[inline] + fn default() -> Self { + IdentifyMessageArgs { + payload_type: IdentifyPayload::NONE, + payload: None, + } + } +} +pub struct IdentifyMessageBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> IdentifyMessageBuilder<'a, 'b> { + #[inline] + pub fn add_payload_type(&mut self, payload_type: IdentifyPayload) { + self.fbb_.push_slot::(IdentifyMessage::VT_PAYLOAD_TYPE, payload_type, IdentifyPayload::NONE); + } + #[inline] + pub fn add_payload(&mut self, payload: flatbuffers::WIPOffset) { + self.fbb_.push_slot_always::>(IdentifyMessage::VT_PAYLOAD, payload); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> IdentifyMessageBuilder<'a, 'b> { + let start = _fbb.start_table(); + IdentifyMessageBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum ListenAddrsOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct ListenAddrs<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for ListenAddrs<'a> { + type Inner = ListenAddrs<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> ListenAddrs<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + ListenAddrs { + _tab: table, + } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args ListenAddrsArgs<'args>) -> flatbuffers::WIPOffset> { + let mut builder = ListenAddrsBuilder::new(_fbb); + if let Some(x) = args.addrs { builder.add_addrs(x); } + builder.finish() + } + + pub const VT_ADDRS: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn addrs(&self) -> Option>>> { + self._tab.get::>>>>(ListenAddrs::VT_ADDRS, None) + } +} + +pub struct ListenAddrsArgs<'a> { + pub addrs: Option>>>>, +} +impl<'a> Default for ListenAddrsArgs<'a> { + #[inline] + fn default() -> Self { + ListenAddrsArgs { + addrs: None, + } + } +} +pub struct ListenAddrsBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> ListenAddrsBuilder<'a, 'b> { + #[inline] + pub fn add_addrs(&mut self, addrs: flatbuffers::WIPOffset>>>) { + self.fbb_.push_slot_always::>(ListenAddrs::VT_ADDRS, addrs); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> ListenAddrsBuilder<'a, 'b> { + let start = _fbb.start_table(); + ListenAddrsBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum ObservedAddrOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct ObservedAddr<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for ObservedAddr<'a> { + type Inner = ObservedAddr<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> ObservedAddr<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + ObservedAddr { + _tab: table, + } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args ObservedAddrArgs<'args>) -> flatbuffers::WIPOffset> { + let mut builder = ObservedAddrBuilder::new(_fbb); + if let Some(x) = args.addr { builder.add_addr(x); } + builder.finish() + } + + pub const VT_ADDR: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn addr(&self) -> Option> { + self._tab.get::>>(ObservedAddr::VT_ADDR, None) + } +} + +pub struct ObservedAddrArgs<'a> { + pub addr: Option>>, +} +impl<'a> Default for ObservedAddrArgs<'a> { + #[inline] + fn default() -> Self { + ObservedAddrArgs { + addr: None, + } + } +} +pub struct ObservedAddrBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> ObservedAddrBuilder<'a, 'b> { + #[inline] + pub fn add_addr(&mut self, addr: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(ObservedAddr::VT_ADDR, addr); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> ObservedAddrBuilder<'a, 'b> { + let start = _fbb.start_table(); + ObservedAddrBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +pub enum BytesOffset {} +#[derive(Copy, Clone, Debug, PartialEq)] + +pub struct Bytes<'a> { + pub _tab: flatbuffers::Table<'a>, +} + +impl<'a> flatbuffers::Follow<'a> for Bytes<'a> { + type Inner = Bytes<'a>; + #[inline] + fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { + Self { + _tab: flatbuffers::Table { buf: buf, loc: loc }, + } + } +} + +impl<'a> Bytes<'a> { + #[inline] + pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { + Bytes { + _tab: table, + } + } + #[allow(unused_mut)] + pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( + _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, + args: &'args BytesArgs<'args>) -> flatbuffers::WIPOffset> { + let mut builder = BytesBuilder::new(_fbb); + if let Some(x) = args.seq { builder.add_seq(x); } + builder.finish() + } + + pub const VT_SEQ: flatbuffers::VOffsetT = 4; + + #[inline] + pub fn seq(&self) -> Option<&'a [u8]> { + self._tab.get::>>(Bytes::VT_SEQ, None).map(|v| v.safe_slice()) + } +} + +pub struct BytesArgs<'a> { + pub seq: Option>>, +} +impl<'a> Default for BytesArgs<'a> { + #[inline] + fn default() -> Self { + BytesArgs { + seq: None, + } + } +} +pub struct BytesBuilder<'a: 'b, 'b> { + fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, + start_: flatbuffers::WIPOffset, +} +impl<'a: 'b, 'b> BytesBuilder<'a, 'b> { + #[inline] + pub fn add_seq(&mut self, seq: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Bytes::VT_SEQ, seq); + } + #[inline] + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> BytesBuilder<'a, 'b> { + let start = _fbb.start_table(); + BytesBuilder { + fbb_: _fbb, + start_: start, + } + } + #[inline] + pub fn finish(self) -> flatbuffers::WIPOffset> { + let o = self.fbb_.end_table(self.start_); + flatbuffers::WIPOffset::new(o.value()) + } +} + +} // pub mod Identify +} // pub mod P2P + From a2bdcc207d6073bf388456e441759402a3268fb0 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Tue, 5 Mar 2019 19:28:26 +0800 Subject: [PATCH 02/17] Add flatbuffers encode decode functions --- identify/src/lib.rs | 10 +-- identify/src/message.rs | 107 +++++++++++++++++++++++++++++ identify/src/protocol.fbs | 11 +-- identify/src/protocol_generated.rs | 78 ++++++++++++--------- 4 files changed, 163 insertions(+), 43 deletions(-) create mode 100644 identify/src/message.rs diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 4eab2f5a..09fc8eb5 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -1,7 +1,7 @@ +#[rustfmt::skip] +#[allow(clippy::all)] +mod protocol_generated; +mod message; -pub struct Identify { -} - -pub enum IdentifyMessage { -} +pub struct Identify {} diff --git a/identify/src/message.rs b/identify/src/message.rs new file mode 100644 index 00000000..f0c1ea01 --- /dev/null +++ b/identify/src/message.rs @@ -0,0 +1,107 @@ +use std::net::{IpAddr, SocketAddr}; + +use flatbuffers::{get_root, FlatBufferBuilder, WIPOffset}; + +use crate::protocol_generated::p2p::identify::{ + Address as FbsAddress, AddressBuilder, IdentifyMessage as FbsIdentifyMessage, + IdentifyMessageBuilder, IdentifyPayload as FbsIdentifyPayload, ListenAddrs as FbsListenAddrs, + ListenAddrsBuilder, ObservedAddr as FbsObservedAddr, ObservedAddrBuilder, +}; + +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum IdentifyMessage { + ListenAddrs(Vec), + ObservedAddr(SocketAddr), +} + +impl IdentifyMessage { + pub fn encode(&self) -> Vec { + let mut fbb = FlatBufferBuilder::new(); + let offset = match self { + IdentifyMessage::ListenAddrs(addrs) => { + let mut vec_addrs = Vec::new(); + for addr in addrs { + vec_addrs.push(addr_to_offset(&mut fbb, addr)); + } + let fbs_addrs = fbb.create_vector(&vec_addrs); + let mut listen_addrs_builder = ListenAddrsBuilder::new(&mut fbb); + listen_addrs_builder.add_addrs(fbs_addrs); + let listen_addrs = listen_addrs_builder.finish(); + + let mut builder = IdentifyMessageBuilder::new(&mut fbb); + builder.add_payload_type(FbsIdentifyPayload::ListenAddrs); + builder.add_payload(listen_addrs.as_union_value()); + builder.finish() + } + IdentifyMessage::ObservedAddr(addr) => { + let addr_offset = addr_to_offset(&mut fbb, &addr); + let mut observed_addr_builder = ObservedAddrBuilder::new(&mut fbb); + observed_addr_builder.add_addr(addr_offset); + let observed_addr = observed_addr_builder.finish(); + + let mut builder = IdentifyMessageBuilder::new(&mut fbb); + builder.add_payload_type(FbsIdentifyPayload::ObservedAddr); + builder.add_payload(observed_addr.as_union_value()); + builder.finish() + } + }; + fbb.finish(offset, None); + fbb.finished_data().to_vec() + } + + pub fn decode(data: &[u8]) -> Option { + let fbs_message = get_root::(data); + let payload = fbs_message.payload()?; + match fbs_message.payload_type() { + FbsIdentifyPayload::ListenAddrs => { + let fbs_listen_addrs = FbsListenAddrs::init_from_table(payload); + let fbs_addrs = fbs_listen_addrs.addrs()?; + let mut addrs = Vec::new(); + for i in 0..fbs_addrs.len() { + let addr = fbs_addrs.get(i); + addrs.push(fbs_to_addr(&addr)?); + } + Some(IdentifyMessage::ListenAddrs(addrs)) + } + FbsIdentifyPayload::ObservedAddr => { + let fbs_observed_addr = FbsObservedAddr::init_from_table(payload); + let fbs_addr = fbs_observed_addr.addr()?; + let addr = fbs_to_addr(&fbs_addr)?; + Some(IdentifyMessage::ObservedAddr(addr)) + } + _ => None, + } + } +} + +fn addr_to_offset<'b>( + fbb: &mut FlatBufferBuilder<'b>, + addr: &SocketAddr, +) -> WIPOffset> { + let ip = match addr.ip() { + IpAddr::V4(ipv4) => fbb.create_vector(&ipv4.octets()), + IpAddr::V6(ipv6) => fbb.create_vector(&ipv6.octets()), + }; + let mut addr_builder = AddressBuilder::new(fbb); + addr_builder.add_ip(ip); + addr_builder.add_port(addr.port()); + addr_builder.finish() +} + +fn fbs_to_addr(addr: &FbsAddress) -> Option { + let ip_bytes = addr.ip()?; + let ip_addr = match ip_bytes.len() { + 4 => { + let mut data = [0u8; 4]; + data.copy_from_slice(ip_bytes); + Some(IpAddr::from(data)) + } + 16 => { + let mut data = [0u8; 16]; + data.copy_from_slice(ip_bytes); + Some(IpAddr::from(data)) + } + _ => None, + }; + ip_addr.map(|ip| SocketAddr::new(ip, addr.port())) +} diff --git a/identify/src/protocol.fbs b/identify/src/protocol.fbs index 768e0f84..7be34f22 100644 --- a/identify/src/protocol.fbs +++ b/identify/src/protocol.fbs @@ -10,13 +10,14 @@ table IdentifyMessage { } table ListenAddrs { - addrs: [Bytes]; + addrs: [Address]; } table ObservedAddr { - addr: Bytes; + addr: Address; } -table Bytes { - seq: [ubyte]; -} \ No newline at end of file +table Address { + ip: [ubyte]; + port: ushort; +} diff --git a/identify/src/protocol_generated.rs b/identify/src/protocol_generated.rs index 89d5f02c..7b5657e7 100644 --- a/identify/src/protocol_generated.rs +++ b/identify/src/protocol_generated.rs @@ -232,13 +232,13 @@ impl<'a> ListenAddrs<'a> { pub const VT_ADDRS: flatbuffers::VOffsetT = 4; #[inline] - pub fn addrs(&self) -> Option>>> { - self._tab.get::>>>>(ListenAddrs::VT_ADDRS, None) + pub fn addrs(&self) -> Option>>> { + self._tab.get::>>>>(ListenAddrs::VT_ADDRS, None) } } pub struct ListenAddrsArgs<'a> { - pub addrs: Option>>>>, + pub addrs: Option>>>>, } impl<'a> Default for ListenAddrsArgs<'a> { #[inline] @@ -254,7 +254,7 @@ pub struct ListenAddrsBuilder<'a: 'b, 'b> { } impl<'a: 'b, 'b> ListenAddrsBuilder<'a, 'b> { #[inline] - pub fn add_addrs(&mut self, addrs: flatbuffers::WIPOffset>>>) { + pub fn add_addrs(&mut self, addrs: flatbuffers::WIPOffset>>>) { self.fbb_.push_slot_always::>(ListenAddrs::VT_ADDRS, addrs); } #[inline] @@ -308,13 +308,13 @@ impl<'a> ObservedAddr<'a> { pub const VT_ADDR: flatbuffers::VOffsetT = 4; #[inline] - pub fn addr(&self) -> Option> { - self._tab.get::>>(ObservedAddr::VT_ADDR, None) + pub fn addr(&self) -> Option> { + self._tab.get::>>(ObservedAddr::VT_ADDR, None) } } pub struct ObservedAddrArgs<'a> { - pub addr: Option>>, + pub addr: Option>>, } impl<'a> Default for ObservedAddrArgs<'a> { #[inline] @@ -330,8 +330,8 @@ pub struct ObservedAddrBuilder<'a: 'b, 'b> { } impl<'a: 'b, 'b> ObservedAddrBuilder<'a, 'b> { #[inline] - pub fn add_addr(&mut self, addr: flatbuffers::WIPOffset>) { - self.fbb_.push_slot_always::>(ObservedAddr::VT_ADDR, addr); + pub fn add_addr(&mut self, addr: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(ObservedAddr::VT_ADDR, addr); } #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> ObservedAddrBuilder<'a, 'b> { @@ -348,15 +348,15 @@ impl<'a: 'b, 'b> ObservedAddrBuilder<'a, 'b> { } } -pub enum BytesOffset {} +pub enum AddressOffset {} #[derive(Copy, Clone, Debug, PartialEq)] -pub struct Bytes<'a> { +pub struct Address<'a> { pub _tab: flatbuffers::Table<'a>, } -impl<'a> flatbuffers::Follow<'a> for Bytes<'a> { - type Inner = Bytes<'a>; +impl<'a> flatbuffers::Follow<'a> for Address<'a> { + type Inner = Address<'a>; #[inline] fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { Self { @@ -365,60 +365,72 @@ impl<'a> flatbuffers::Follow<'a> for Bytes<'a> { } } -impl<'a> Bytes<'a> { +impl<'a> Address<'a> { #[inline] pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - Bytes { + Address { _tab: table, } } #[allow(unused_mut)] pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, - args: &'args BytesArgs<'args>) -> flatbuffers::WIPOffset> { - let mut builder = BytesBuilder::new(_fbb); - if let Some(x) = args.seq { builder.add_seq(x); } + args: &'args AddressArgs<'args>) -> flatbuffers::WIPOffset> { + let mut builder = AddressBuilder::new(_fbb); + if let Some(x) = args.ip { builder.add_ip(x); } + builder.add_port(args.port); builder.finish() } - pub const VT_SEQ: flatbuffers::VOffsetT = 4; + pub const VT_IP: flatbuffers::VOffsetT = 4; + pub const VT_PORT: flatbuffers::VOffsetT = 6; #[inline] - pub fn seq(&self) -> Option<&'a [u8]> { - self._tab.get::>>(Bytes::VT_SEQ, None).map(|v| v.safe_slice()) + pub fn ip(&self) -> Option<&'a [u8]> { + self._tab.get::>>(Address::VT_IP, None).map(|v| v.safe_slice()) + } + #[inline] + pub fn port(&self) -> u16 { + self._tab.get::(Address::VT_PORT, Some(0)).unwrap() } } -pub struct BytesArgs<'a> { - pub seq: Option>>, +pub struct AddressArgs<'a> { + pub ip: Option>>, + pub port: u16, } -impl<'a> Default for BytesArgs<'a> { +impl<'a> Default for AddressArgs<'a> { #[inline] fn default() -> Self { - BytesArgs { - seq: None, + AddressArgs { + ip: None, + port: 0, } } } -pub struct BytesBuilder<'a: 'b, 'b> { +pub struct AddressBuilder<'a: 'b, 'b> { fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, start_: flatbuffers::WIPOffset, } -impl<'a: 'b, 'b> BytesBuilder<'a, 'b> { +impl<'a: 'b, 'b> AddressBuilder<'a, 'b> { + #[inline] + pub fn add_ip(&mut self, ip: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Address::VT_IP, ip); + } #[inline] - pub fn add_seq(&mut self, seq: flatbuffers::WIPOffset>) { - self.fbb_.push_slot_always::>(Bytes::VT_SEQ, seq); + pub fn add_port(&mut self, port: u16) { + self.fbb_.push_slot::(Address::VT_PORT, port, 0); } #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> BytesBuilder<'a, 'b> { + pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AddressBuilder<'a, 'b> { let start = _fbb.start_table(); - BytesBuilder { + AddressBuilder { fbb_: _fbb, start_: start, } } #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { + pub fn finish(self) -> flatbuffers::WIPOffset> { let o = self.fbb_.end_table(self.start_); flatbuffers::WIPOffset::new(o.value()) } From 7fdec5d7825b89e2b9b87887e143e4599419b632 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Wed, 6 Mar 2019 10:51:10 +0800 Subject: [PATCH 03/17] Rename message.x to protocol.x --- discovery/src/gen_proto.sh | 2 +- discovery/src/lib.rs | 6 +++--- discovery/src/{message.fbs => protocol.fbs} | 0 discovery/src/{message.rs => protocol.rs} | 2 +- .../src/{message_generated.rs => protocol_generated.rs} | 0 discovery/src/substream.rs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) rename discovery/src/{message.fbs => protocol.fbs} (100%) rename discovery/src/{message.rs => protocol.rs} (99%) rename discovery/src/{message_generated.rs => protocol_generated.rs} (100%) diff --git a/discovery/src/gen_proto.sh b/discovery/src/gen_proto.sh index f7d724bd..e41621cc 100755 --- a/discovery/src/gen_proto.sh +++ b/discovery/src/gen_proto.sh @@ -1,4 +1,4 @@ #! /bin/sh # flatc version 1.10.0 -flatc --rust message.fbs +flatc --rust protocol.fbs diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index cef20cc5..2dab6b09 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -18,16 +18,16 @@ use p2p::{ use rand::seq::SliceRandom; mod addr; -mod message; +mod protocol; mod substream; #[rustfmt::skip] #[allow(clippy::all)] -mod message_generated; +mod protocol_generated; pub use crate::{ addr::{AddrKnown, AddressManager, MisbehaveResult, Misbehavior, RawAddr}, - message::{DiscoveryMessage, Node, Nodes}, + protocol::{DiscoveryMessage, Node, Nodes}, substream::{Direction, Substream, SubstreamKey, SubstreamValue}, }; diff --git a/discovery/src/message.fbs b/discovery/src/protocol.fbs similarity index 100% rename from discovery/src/message.fbs rename to discovery/src/protocol.fbs diff --git a/discovery/src/message.rs b/discovery/src/protocol.rs similarity index 99% rename from discovery/src/message.rs rename to discovery/src/protocol.rs index d24bbb08..87dfb966 100644 --- a/discovery/src/message.rs +++ b/discovery/src/protocol.rs @@ -7,7 +7,7 @@ use tokio::codec::length_delimited::LengthDelimitedCodec; use tokio::codec::{Decoder, Encoder}; use crate::addr::RawAddr; -use crate::message_generated::p2p::discovery::{ +use crate::protocol_generated::p2p::discovery::{ BytesBuilder, DiscoveryMessage as FbsDiscoveryMessage, DiscoveryMessageBuilder, DiscoveryPayload as FbsDiscoveryPayload, GetNodes as FbsGetNodes, GetNodesBuilder, NodeBuilder, Nodes as FbsNodes, NodesBuilder, diff --git a/discovery/src/message_generated.rs b/discovery/src/protocol_generated.rs similarity index 100% rename from discovery/src/message_generated.rs rename to discovery/src/protocol_generated.rs diff --git a/discovery/src/substream.rs b/discovery/src/substream.rs index 6ba3dbb7..4331ca93 100644 --- a/discovery/src/substream.rs +++ b/discovery/src/substream.rs @@ -15,7 +15,7 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tokio::timer::Interval; use crate::addr::{AddrKnown, AddressManager, Misbehavior, RawAddr}; -use crate::message::{DiscoveryCodec, DiscoveryMessage, Node, Nodes}; +use crate::protocol::{DiscoveryCodec, DiscoveryMessage, Node, Nodes}; // FIXME: should be a more high level version number const VERSION: u32 = 0; From 52f22b42f459e2a525ad3b3646267f647ed89324 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Wed, 6 Mar 2019 10:57:41 +0800 Subject: [PATCH 04/17] Rename message.rs to protocol.rs --- identify/src/lib.rs | 8 ++++++-- identify/src/{message.rs => protocol.rs} | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) rename identify/src/{message.rs => protocol.rs} (97%) diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 09fc8eb5..5d7ead93 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -2,6 +2,10 @@ #[allow(clippy::all)] mod protocol_generated; -mod message; +mod protocol; -pub struct Identify {} +pub trait AddrManager { +} + +pub struct Identify { +} diff --git a/identify/src/message.rs b/identify/src/protocol.rs similarity index 97% rename from identify/src/message.rs rename to identify/src/protocol.rs index f0c1ea01..94a52723 100644 --- a/identify/src/message.rs +++ b/identify/src/protocol.rs @@ -15,7 +15,7 @@ pub enum IdentifyMessage { } impl IdentifyMessage { - pub fn encode(&self) -> Vec { + pub(crate) fn encode(&self) -> Vec { let mut fbb = FlatBufferBuilder::new(); let offset = match self { IdentifyMessage::ListenAddrs(addrs) => { @@ -49,7 +49,7 @@ impl IdentifyMessage { fbb.finished_data().to_vec() } - pub fn decode(data: &[u8]) -> Option { + pub(crate) fn decode(data: &[u8]) -> Option { let fbs_message = get_root::(data); let payload = fbs_message.payload()?; match fbs_message.payload_type() { From e82446d28cbd4be36a87ede4e84d8652d43f98fc Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Thu, 7 Mar 2019 10:38:49 +0800 Subject: [PATCH 05/17] update --- identify/Cargo.toml | 4 ++- identify/src/lib.rs | 68 +++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/identify/Cargo.toml b/identify/Cargo.toml index 7a0d19b5..9498b875 100644 --- a/identify/Cargo.toml +++ b/identify/Cargo.toml @@ -7,4 +7,6 @@ edition = "2018" [dependencies] p2p = { path = "..", package = "tentacle" } bytes = "0.4" -flatbuffers = "0.5.0" \ No newline at end of file +flatbuffers = "0.5.0" +tokio = "0.1" +log = "0.4" \ No newline at end of file diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 5d7ead93..f2050d00 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -4,8 +4,72 @@ mod protocol_generated; mod protocol; -pub trait AddrManager { +use log::{debug, error}; +use p2p::{ + context::{ServiceContext, SessionContext}, + secio::PeerId, + traits::{ProtocolHandle, ProtocolMeta, ServiceProtocol}, + utils::multiaddr_to_socketaddr, + ProtocolId, SessionId, +}; +use std::net::SocketAddr; +use tokio::codec::length_delimited::LengthDelimitedCodec; + +use protocol::IdentifyMessage; + +pub trait AddrManager {} + +pub struct IdentifyProtocol { + id: ProtocolId, + listen_addrs: Vec, } -pub struct Identify { +impl ProtocolMeta for IdentifyProtocol { + fn id(&self) -> ProtocolId { + self.id + } + + fn codec(&self) -> LengthDelimitedCodec { + LengthDelimitedCodec::new() + } + + fn service_handle(&self) -> ProtocolHandle> { + ProtocolHandle::Empty + } +} + +impl ServiceProtocol for IdentifyProtocol { + fn init(&mut self, service: &mut ServiceContext) { + self.listen_addrs = service + .listens() + .iter() + .map(|addr| multiaddr_to_socketaddr(addr).unwrap()) + .collect(); + } + + fn connected( + &mut self, + service: &mut ServiceContext, + session: &SessionContext, + _version: &str, + ) { + let data = IdentifyMessage::ListenAddrs(self.listen_addrs.clone()).encode(); + service.send_message(session.id, self.id, data); + let remote_addr = + multiaddr_to_socketaddr(&session.address).expect("Can not get remote address"); + let data = IdentifyMessage::ObservedAddr(remote_addr).encode(); + service.send_message(session.id, self.id, data); + } + + fn disconnected(&mut self, _service: &mut ServiceContext, _session: &SessionContext) {} + + fn received( + &mut self, + _service: &mut ServiceContext, + _session: &SessionContext, + _data: bytes::Bytes, + ) { + } + + fn notify(&mut self, _service: &mut ServiceContext, _token: u64) {} } From cb26780768cc4462a8cb8c7dc94c783a28cbfc93 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 7 Mar 2019 15:09:29 +0800 Subject: [PATCH 06/17] Basic finish identify protocol --- Cargo.toml | 1 + identify/src/lib.rs | 146 +++++++++++++++++++++++++++++++++++++++----- 2 files changed, 132 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fb6daf09..d45f1297 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ crossbeam-channel = "0.3.6" systemstat = "0.1.3" nix = "0.13.0" ping = { path = "ping", package = "tentacle-ping" } +identify = { path = "identify" } generic-channel = { version = "0.2.0", features = ["all"] } [workspace] diff --git a/identify/src/lib.rs b/identify/src/lib.rs index f2050d00..89698e48 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -4,7 +4,11 @@ mod protocol_generated; mod protocol; -use log::{debug, error}; +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +use log::{debug, trace}; use p2p::{ context::{ServiceContext, SessionContext}, secio::PeerId, @@ -12,16 +16,63 @@ use p2p::{ utils::multiaddr_to_socketaddr, ProtocolId, SessionId, }; -use std::net::SocketAddr; use tokio::codec::length_delimited::LengthDelimitedCodec; use protocol::IdentifyMessage; +const CHECK_TIMEOUT_TOKEN: u64 = 100; + pub trait AddrManager {} pub struct IdentifyProtocol { id: ProtocolId, listen_addrs: Vec, + observed_addrs: HashMap, + remote_infos: HashMap, +} + +impl IdentifyProtocol { + pub fn new(id: ProtocolId) -> IdentifyProtocol { + IdentifyProtocol { + id, + listen_addrs: Vec::new(), + observed_addrs: HashMap::default(), + remote_infos: HashMap::default(), + } + } +} + +pub(crate) struct RemoteInfo { + peer_id: PeerId, + + #[allow(dead_code)] + session: SessionContext, + #[allow(dead_code)] + version: String, + + connected_at: Instant, + timeout: Duration, + listen_addrs: Option>, + observed_addr: Option, +} + +impl RemoteInfo { + fn new(session: SessionContext, version: &str, timeout: Duration) -> RemoteInfo { + let peer_id = session + .remote_pubkey + .as_ref() + .map(|key| PeerId::from_public_key(&key)) + .expect("secio must enabled!"); + RemoteInfo { + peer_id, + session, + version: version.to_string(), + connected_at: Instant::now(), + timeout, + listen_addrs: None, + observed_addr: None, + } + } } impl ProtocolMeta for IdentifyProtocol { @@ -34,7 +85,7 @@ impl ProtocolMeta for IdentifyProtocol { } fn service_handle(&self) -> ProtocolHandle> { - ProtocolHandle::Empty + ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(self.id))) } } @@ -45,15 +96,28 @@ impl ServiceProtocol for IdentifyProtocol { .iter() .map(|addr| multiaddr_to_socketaddr(addr).unwrap()) .collect(); + + // TODO: magic number + service.set_service_notify(self.id, Duration::from_secs(1), CHECK_TIMEOUT_TOKEN); } - fn connected( - &mut self, - service: &mut ServiceContext, - session: &SessionContext, - _version: &str, - ) { - let data = IdentifyMessage::ListenAddrs(self.listen_addrs.clone()).encode(); + fn connected(&mut self, service: &mut ServiceContext, session: &SessionContext, version: &str) { + if session.remote_pubkey.is_none() { + panic!("IdentifyProtocol require secio enabled!"); + } + // TODO: magic number + let remote_info = RemoteInfo::new(session.clone(), version, Duration::from_secs(8)); + trace!("IdentifyProtocol sconnected from {:?}", remote_info.peer_id); + self.remote_infos.insert(session.id, remote_info); + + let listen_addrs: HashSet = self + .observed_addrs + .values() + .into_iter() + .chain(self.listen_addrs.iter()) + .map(Clone::clone) + .collect(); + let data = IdentifyMessage::ListenAddrs(listen_addrs.into_iter().collect()).encode(); service.send_message(session.id, self.id, data); let remote_addr = multiaddr_to_socketaddr(&session.address).expect("Can not get remote address"); @@ -61,15 +125,67 @@ impl ServiceProtocol for IdentifyProtocol { service.send_message(session.id, self.id, data); } - fn disconnected(&mut self, _service: &mut ServiceContext, _session: &SessionContext) {} + fn disconnected(&mut self, _service: &mut ServiceContext, session: &SessionContext) { + let info = self + .remote_infos + .remove(&session.id) + .expect("RemoteInfo must exists"); + trace!("IdentifyProtocol disconnected from {:?}", info.peer_id); + } fn received( &mut self, - _service: &mut ServiceContext, - _session: &SessionContext, - _data: bytes::Bytes, + service: &mut ServiceContext, + session: &SessionContext, + data: bytes::Bytes, ) { + let info = self + .remote_infos + .get_mut(&session.id) + .expect("RemoteInfo must exists"); + match IdentifyMessage::decode(&data) { + Some(IdentifyMessage::ListenAddrs(addrs)) => { + if info.listen_addrs.is_some() { + // TODO report misbehavior: repeat send listen_addrs + debug!("remote({:?}) repeat send observed address", info.peer_id); + service.disconnect(session.id); + } else { + trace!("received listen addresses: {:?}", addrs); + info.listen_addrs = Some(addrs); + } + } + Some(IdentifyMessage::ObservedAddr(addr)) => { + if info.observed_addr.is_some() { + // TODO report misbehavior: repeat send listen_addrs + debug!("remote({:?}) repeat send listen addresses", info.peer_id); + service.disconnect(session.id); + } else { + trace!("received observed address: {}", addr); + info.observed_addr = Some(addr); + // TODO how can we trust this address? + self.observed_addrs.insert(info.peer_id.clone(), addr); + } + } + None => { + debug!( + "IdentifyProtocol received invalid data from {:?}", + info.peer_id + ); + // TODO: report misbehavior: invalid data + service.disconnect(session.id); + } + } } - fn notify(&mut self, _service: &mut ServiceContext, _token: u64) {} + fn notify(&mut self, service: &mut ServiceContext, _token: u64) { + for (session_id, info) in self.remote_infos.iter() { + if (info.listen_addrs.is_none() || info.observed_addr.is_none()) + && (info.connected_at + info.timeout) <= Instant::now() + { + // TODO: report timeout + debug!("{:?} receive identify message timeout", info.peer_id); + service.disconnect(*session_id); + } + } + } } From 1552ed990f1a855b1bdf38ab13e169d373e0059d Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 7 Mar 2019 15:09:43 +0800 Subject: [PATCH 07/17] Add identify protocol example --- examples/id.rs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 examples/id.rs diff --git a/examples/id.rs b/examples/id.rs new file mode 100644 index 00000000..3ffd7931 --- /dev/null +++ b/examples/id.rs @@ -0,0 +1,52 @@ +use env_logger; +use log::{debug, info}; + +use std::time::Duration; + +use futures::{future::lazy, prelude::*, sync::mpsc::channel}; +use identify::IdentifyProtocol; +use tentacle::{ + builder::ServiceBuilder, + context::ServiceContext, + secio::SecioKeyPair, + service::{ServiceError, ServiceEvent}, + traits::ServiceHandle, +}; + +fn main() { + env_logger::init(); + let protocol = IdentifyProtocol::new(1); + if std::env::args().nth(1) == Some("server".to_string()) { + debug!("Starting server ......"); + let mut service = ServiceBuilder::default() + .insert_protocol(protocol) + .key_pair(SecioKeyPair::secp256k1_generated()) + .forever(true) + .build(SimpleHandler {}); + let _ = service.dial("/ip4/127.0.0.1/tcp/1338".parse().unwrap()); + let _ = service.listen("/ip4/127.0.0.1/tcp/1337".parse().unwrap()); + tokio::run(lazy(|| service.for_each(|_| Ok(())))) + } else { + debug!("Starting client ......"); + let mut service = ServiceBuilder::default() + .insert_protocol(protocol) + .key_pair(SecioKeyPair::secp256k1_generated()) + .forever(true) + .build(SimpleHandler {}); + let _ = service.dial("/ip4/127.0.0.1/tcp/1337".parse().unwrap()); + let _ = service.listen("/ip4/127.0.0.1/tcp/1338".parse().unwrap()); + tokio::run(lazy(|| service.for_each(|_| Ok(())))) + } +} + +struct SimpleHandler {} + +impl ServiceHandle for SimpleHandler { + fn handle_error(&mut self, _env: &mut ServiceContext, error: ServiceError) { + debug!("service error: {:?}", error); + } + + fn handle_event(&mut self, _env: &mut ServiceContext, event: ServiceEvent) { + debug!("service event: {:?}", event); + } +} From 8a2cd2f58cdd92b2d1c3b5591c31e0027db5e853 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 7 Mar 2019 15:10:46 +0800 Subject: [PATCH 08/17] Fix clippy --- identify/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 89698e48..d8dfcda3 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -113,7 +113,6 @@ impl ServiceProtocol for IdentifyProtocol { let listen_addrs: HashSet = self .observed_addrs .values() - .into_iter() .chain(self.listen_addrs.iter()) .map(Clone::clone) .collect(); From 4a77add501a31cdf0093c7c220653986f75ff836 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 7 Mar 2019 15:55:48 +0800 Subject: [PATCH 09/17] Fix ci --- examples/id.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/examples/id.rs b/examples/id.rs index 3ffd7931..9c6061e1 100644 --- a/examples/id.rs +++ b/examples/id.rs @@ -1,9 +1,7 @@ use env_logger; -use log::{debug, info}; +use log::debug; -use std::time::Duration; - -use futures::{future::lazy, prelude::*, sync::mpsc::channel}; +use futures::{future::lazy, prelude::*}; use identify::IdentifyProtocol; use tentacle::{ builder::ServiceBuilder, From c1a7bd17943899afa6d9921beac41d0e77370a95 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Thu, 7 Mar 2019 16:46:22 +0800 Subject: [PATCH 10/17] Remove disconnect --- identify/src/lib.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/identify/src/lib.rs b/identify/src/lib.rs index d8dfcda3..5eff116c 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -147,7 +147,6 @@ impl ServiceProtocol for IdentifyProtocol { if info.listen_addrs.is_some() { // TODO report misbehavior: repeat send listen_addrs debug!("remote({:?}) repeat send observed address", info.peer_id); - service.disconnect(session.id); } else { trace!("received listen addresses: {:?}", addrs); info.listen_addrs = Some(addrs); @@ -157,7 +156,6 @@ impl ServiceProtocol for IdentifyProtocol { if info.observed_addr.is_some() { // TODO report misbehavior: repeat send listen_addrs debug!("remote({:?}) repeat send listen addresses", info.peer_id); - service.disconnect(session.id); } else { trace!("received observed address: {}", addr); info.observed_addr = Some(addr); @@ -171,7 +169,6 @@ impl ServiceProtocol for IdentifyProtocol { info.peer_id ); // TODO: report misbehavior: invalid data - service.disconnect(session.id); } } } @@ -183,7 +180,6 @@ impl ServiceProtocol for IdentifyProtocol { { // TODO: report timeout debug!("{:?} receive identify message timeout", info.peer_id); - service.disconnect(*session_id); } } } From 122f8890bd0ffa693ac3477a29b92ddb6762fd05 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Fri, 8 Mar 2019 12:00:49 +0800 Subject: [PATCH 11/17] Add AddrManager --- examples/id.rs | 21 ++++++- identify/src/lib.rs | 135 ++++++++++++++++++++++++++++++++++++++------ 2 files changed, 138 insertions(+), 18 deletions(-) diff --git a/examples/id.rs b/examples/id.rs index 9c6061e1..6aeae753 100644 --- a/examples/id.rs +++ b/examples/id.rs @@ -1,11 +1,13 @@ use env_logger; use log::debug; +use std::net::SocketAddr; use futures::{future::lazy, prelude::*}; -use identify::IdentifyProtocol; +use identify::{AddrManager, IdentifyProtocol, MisbehaveResult, Misbehavior}; use tentacle::{ builder::ServiceBuilder, context::ServiceContext, + multiaddr::Multiaddr, secio::SecioKeyPair, service::{ServiceError, ServiceEvent}, traits::ServiceHandle, @@ -13,7 +15,8 @@ use tentacle::{ fn main() { env_logger::init(); - let protocol = IdentifyProtocol::new(1); + let addr_mgr = SimpleAddrManager {}; + let protocol = IdentifyProtocol::new(1, addr_mgr); if std::env::args().nth(1) == Some("server".to_string()) { debug!("Starting server ......"); let mut service = ServiceBuilder::default() @@ -37,6 +40,20 @@ fn main() { } } +#[derive(Clone)] +struct SimpleAddrManager {} + +impl AddrManager for SimpleAddrManager { + /// Add remote peer's listen addresses + fn add_listen_addrs(&mut self, _peer_addr: Multiaddr, _addrs: Vec) {} + /// Add our address observed by remote peer + fn add_observed_addr(&mut self, _peer_addr: Multiaddr, _addr: SocketAddr) {} + /// Report misbehavior + fn misbehave(&mut self, _peer_addr: Multiaddr, _kind: Misbehavior) -> MisbehaveResult { + MisbehaveResult::Disconnect + } +} + struct SimpleHandler {} impl ServiceHandle for SimpleHandler { diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 5eff116c..b76c7953 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -11,6 +11,7 @@ use std::time::{Duration, Instant}; use log::{debug, trace}; use p2p::{ context::{ServiceContext, SessionContext}, + multiaddr::Multiaddr, secio::PeerId, traits::{ProtocolHandle, ProtocolMeta, ServiceProtocol}, utils::multiaddr_to_socketaddr, @@ -21,20 +22,72 @@ use tokio::codec::length_delimited::LengthDelimitedCodec; use protocol::IdentifyMessage; const CHECK_TIMEOUT_TOKEN: u64 = 100; +// Check timeout interval (seconds) +const CHECK_TIMEOUT_INTERVAL: u64 = 1; +const DEFAULT_TIMEOUT: u64 = 8; +const MAX_ADDRS: usize = 10; -pub trait AddrManager {} +/// The misbehavior to report to underlying peer storage +pub enum Misbehavior { + /// Repeat send listen addresses + DuplicateListenAddrs, + /// Repeat send observed address + DuplicateObservedAddr, + /// Timeout reached + Timeout, + /// Remote peer send invalid data + InvalidData, + /// Send too many addresses in listen addresses + TooManyAddresses(usize), +} + +/// Misbehavior report result +pub enum MisbehaveResult { + /// Continue to run + Continue, + /// Disconnect this peer + Disconnect, +} -pub struct IdentifyProtocol { +impl MisbehaveResult { + pub fn is_continue(&self) -> bool { + match self { + MisbehaveResult::Continue => true, + _ => false, + } + } + pub fn is_disconnect(&self) -> bool { + match self { + MisbehaveResult::Disconnect => true, + _ => false, + } + } +} + +/// The trait to communicate with underlying peer storage +pub trait AddrManager: Clone + Send { + /// Add remote peer's listen addresses + fn add_listen_addrs(&mut self, peer_addr: Multiaddr, addrs: Vec); + /// Add our address observed by remote peer + fn add_observed_addr(&mut self, peer_addr: Multiaddr, addr: SocketAddr); + /// Report misbehavior + fn misbehave(&mut self, peer_addr: Multiaddr, kind: Misbehavior) -> MisbehaveResult; +} + +/// Identify protocol +pub struct IdentifyProtocol { id: ProtocolId, + addr_mgr: T, listen_addrs: Vec, observed_addrs: HashMap, remote_infos: HashMap, } -impl IdentifyProtocol { - pub fn new(id: ProtocolId) -> IdentifyProtocol { +impl IdentifyProtocol { + pub fn new(id: ProtocolId, addr_mgr: T) -> IdentifyProtocol { IdentifyProtocol { id, + addr_mgr, listen_addrs: Vec::new(), observed_addrs: HashMap::default(), remote_infos: HashMap::default(), @@ -45,6 +98,7 @@ impl IdentifyProtocol { pub(crate) struct RemoteInfo { peer_id: PeerId, + // TODO: for future usage #[allow(dead_code)] session: SessionContext, #[allow(dead_code)] @@ -75,7 +129,7 @@ impl RemoteInfo { } } -impl ProtocolMeta for IdentifyProtocol { +impl ProtocolMeta for IdentifyProtocol { fn id(&self) -> ProtocolId { self.id } @@ -85,11 +139,14 @@ impl ProtocolMeta for IdentifyProtocol { } fn service_handle(&self) -> ProtocolHandle> { - ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(self.id))) + ProtocolHandle::Callback(Box::new(IdentifyProtocol::new( + self.id, + self.addr_mgr.clone(), + ))) } } -impl ServiceProtocol for IdentifyProtocol { +impl ServiceProtocol for IdentifyProtocol { fn init(&mut self, service: &mut ServiceContext) { self.listen_addrs = service .listens() @@ -97,16 +154,22 @@ impl ServiceProtocol for IdentifyProtocol { .map(|addr| multiaddr_to_socketaddr(addr).unwrap()) .collect(); - // TODO: magic number - service.set_service_notify(self.id, Duration::from_secs(1), CHECK_TIMEOUT_TOKEN); + service.set_service_notify( + self.id, + Duration::from_secs(CHECK_TIMEOUT_INTERVAL), + CHECK_TIMEOUT_TOKEN, + ); } fn connected(&mut self, service: &mut ServiceContext, session: &SessionContext, version: &str) { if session.remote_pubkey.is_none() { panic!("IdentifyProtocol require secio enabled!"); } - // TODO: magic number - let remote_info = RemoteInfo::new(session.clone(), version, Duration::from_secs(8)); + let remote_info = RemoteInfo::new( + session.clone(), + version, + Duration::from_secs(DEFAULT_TIMEOUT), + ); trace!("IdentifyProtocol sconnected from {:?}", remote_info.peer_id); self.remote_infos.insert(session.id, remote_info); @@ -114,6 +177,7 @@ impl ServiceProtocol for IdentifyProtocol { .observed_addrs .values() .chain(self.listen_addrs.iter()) + .take(MAX_ADDRS) .map(Clone::clone) .collect(); let data = IdentifyMessage::ListenAddrs(listen_addrs.into_iter().collect()).encode(); @@ -145,21 +209,48 @@ impl ServiceProtocol for IdentifyProtocol { match IdentifyMessage::decode(&data) { Some(IdentifyMessage::ListenAddrs(addrs)) => { if info.listen_addrs.is_some() { - // TODO report misbehavior: repeat send listen_addrs debug!("remote({:?}) repeat send observed address", info.peer_id); + if self + .addr_mgr + .misbehave(session.address.clone(), Misbehavior::DuplicateListenAddrs) + .is_disconnect() + { + service.disconnect(session.id); + } + } else if addrs.len() > MAX_ADDRS { + if self + .addr_mgr + .misbehave( + session.address.clone(), + Misbehavior::TooManyAddresses(addrs.len()), + ) + .is_disconnect() + { + service.disconnect(session.id); + } } else { trace!("received listen addresses: {:?}", addrs); + self.addr_mgr + .add_listen_addrs(session.address.clone(), addrs.clone()); info.listen_addrs = Some(addrs); } } Some(IdentifyMessage::ObservedAddr(addr)) => { if info.observed_addr.is_some() { - // TODO report misbehavior: repeat send listen_addrs debug!("remote({:?}) repeat send listen addresses", info.peer_id); + if self + .addr_mgr + .misbehave(session.address.clone(), Misbehavior::DuplicateObservedAddr) + .is_disconnect() + { + service.disconnect(session.id); + } } else { trace!("received observed address: {}", addr); info.observed_addr = Some(addr); // TODO how can we trust this address? + self.addr_mgr + .add_observed_addr(session.address.clone(), addr); self.observed_addrs.insert(info.peer_id.clone(), addr); } } @@ -168,18 +259,30 @@ impl ServiceProtocol for IdentifyProtocol { "IdentifyProtocol received invalid data from {:?}", info.peer_id ); - // TODO: report misbehavior: invalid data + if self + .addr_mgr + .misbehave(session.address.clone(), Misbehavior::InvalidData) + .is_disconnect() + { + service.disconnect(session.id); + } } } } fn notify(&mut self, service: &mut ServiceContext, _token: u64) { - for (session_id, info) in self.remote_infos.iter() { + for (session_id, info) in &self.remote_infos { if (info.listen_addrs.is_none() || info.observed_addr.is_none()) && (info.connected_at + info.timeout) <= Instant::now() { - // TODO: report timeout debug!("{:?} receive identify message timeout", info.peer_id); + if self + .addr_mgr + .misbehave(info.session.address.clone(), Misbehavior::Timeout) + .is_disconnect() + { + service.disconnect(*session_id); + } } } } From 463dd819fcf78970b6591b745cfbb43fa468db43 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Fri, 8 Mar 2019 12:38:34 +0800 Subject: [PATCH 12/17] Fix rebase --- examples/id.rs | 12 +++++++++--- identify/src/lib.rs | 8 ++++---- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/examples/id.rs b/examples/id.rs index 6aeae753..bed879a0 100644 --- a/examples/id.rs +++ b/examples/id.rs @@ -9,7 +9,7 @@ use tentacle::{ context::ServiceContext, multiaddr::Multiaddr, secio::SecioKeyPair, - service::{ServiceError, ServiceEvent}, + service::{DialProtocol, ServiceError, ServiceEvent}, traits::ServiceHandle, }; @@ -24,7 +24,10 @@ fn main() { .key_pair(SecioKeyPair::secp256k1_generated()) .forever(true) .build(SimpleHandler {}); - let _ = service.dial("/ip4/127.0.0.1/tcp/1338".parse().unwrap()); + let _ = service.dial( + "/ip4/127.0.0.1/tcp/1338".parse().unwrap(), + DialProtocol::All, + ); let _ = service.listen("/ip4/127.0.0.1/tcp/1337".parse().unwrap()); tokio::run(lazy(|| service.for_each(|_| Ok(())))) } else { @@ -34,7 +37,10 @@ fn main() { .key_pair(SecioKeyPair::secp256k1_generated()) .forever(true) .build(SimpleHandler {}); - let _ = service.dial("/ip4/127.0.0.1/tcp/1337".parse().unwrap()); + let _ = service.dial( + "/ip4/127.0.0.1/tcp/1337".parse().unwrap(), + DialProtocol::All, + ); let _ = service.listen("/ip4/127.0.0.1/tcp/1338".parse().unwrap()); tokio::run(lazy(|| service.for_each(|_| Ok(())))) } diff --git a/identify/src/lib.rs b/identify/src/lib.rs index b76c7953..3098f6ce 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -13,7 +13,7 @@ use p2p::{ context::{ServiceContext, SessionContext}, multiaddr::Multiaddr, secio::PeerId, - traits::{ProtocolHandle, ProtocolMeta, ServiceProtocol}, + traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceProtocol}, utils::multiaddr_to_socketaddr, ProtocolId, SessionId, }; @@ -129,13 +129,13 @@ impl RemoteInfo { } } -impl ProtocolMeta for IdentifyProtocol { +impl ProtocolMeta for IdentifyProtocol { fn id(&self) -> ProtocolId { self.id } - fn codec(&self) -> LengthDelimitedCodec { - LengthDelimitedCodec::new() + fn codec(&self) -> Box { + Box::new(LengthDelimitedCodec::new()) } fn service_handle(&self) -> ProtocolHandle> { From 5721965db67479c58dc59aae0c26a7d5042c0587 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Mon, 11 Mar 2019 15:54:09 +0800 Subject: [PATCH 13/17] Fix PR review --- identify/src/lib.rs | 67 +++++++++++++++++++++++++++++---------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 3098f6ce..7aa3dad1 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -8,10 +8,10 @@ use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::time::{Duration, Instant}; -use log::{debug, trace}; +use log::{debug, error, trace}; use p2p::{ context::{ServiceContext, SessionContext}, - multiaddr::Multiaddr, + multiaddr::{Multiaddr, ToMultiaddr}, secio::PeerId, traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceProtocol}, utils::multiaddr_to_socketaddr, @@ -67,11 +67,11 @@ impl MisbehaveResult { /// The trait to communicate with underlying peer storage pub trait AddrManager: Clone + Send { /// Add remote peer's listen addresses - fn add_listen_addrs(&mut self, peer_addr: Multiaddr, addrs: Vec); + fn add_listen_addrs(&mut self, peer: &PeerId, addrs: Vec); /// Add our address observed by remote peer - fn add_observed_addr(&mut self, peer_addr: Multiaddr, addr: SocketAddr); + fn add_observed_addr(&mut self, peer: &PeerId, addr: Multiaddr); /// Report misbehavior - fn misbehave(&mut self, peer_addr: Multiaddr, kind: Misbehavior) -> MisbehaveResult; + fn misbehave(&mut self, peer: &PeerId, kind: Misbehavior) -> MisbehaveResult; } /// Identify protocol @@ -81,6 +81,7 @@ pub struct IdentifyProtocol { listen_addrs: Vec, observed_addrs: HashMap, remote_infos: HashMap, + secio_enabled: bool, } impl IdentifyProtocol { @@ -91,6 +92,7 @@ impl IdentifyProtocol { listen_addrs: Vec::new(), observed_addrs: HashMap::default(), remote_infos: HashMap::default(), + secio_enabled: true, } } } @@ -163,8 +165,12 @@ impl ServiceProtocol for IdentifyProtocol { fn connected(&mut self, service: &mut ServiceContext, session: &SessionContext, version: &str) { if session.remote_pubkey.is_none() { - panic!("IdentifyProtocol require secio enabled!"); + error!("IdentifyProtocol require secio enabled!"); + service.disconnect(session.id); + self.secio_enabled = false; + return; } + let remote_info = RemoteInfo::new( session.clone(), version, @@ -178,7 +184,7 @@ impl ServiceProtocol for IdentifyProtocol { .values() .chain(self.listen_addrs.iter()) .take(MAX_ADDRS) - .map(Clone::clone) + .cloned() .collect(); let data = IdentifyMessage::ListenAddrs(listen_addrs.into_iter().collect()).encode(); service.send_message(session.id, self.id, data); @@ -189,11 +195,13 @@ impl ServiceProtocol for IdentifyProtocol { } fn disconnected(&mut self, _service: &mut ServiceContext, session: &SessionContext) { - let info = self - .remote_infos - .remove(&session.id) - .expect("RemoteInfo must exists"); - trace!("IdentifyProtocol disconnected from {:?}", info.peer_id); + if self.secio_enabled { + let info = self + .remote_infos + .remove(&session.id) + .expect("RemoteInfo must exists"); + trace!("IdentifyProtocol disconnected from {:?}", info.peer_id); + } } fn received( @@ -202,6 +210,10 @@ impl ServiceProtocol for IdentifyProtocol { session: &SessionContext, data: bytes::Bytes, ) { + if !self.secio_enabled { + return; + } + let info = self .remote_infos .get_mut(&session.id) @@ -212,7 +224,7 @@ impl ServiceProtocol for IdentifyProtocol { debug!("remote({:?}) repeat send observed address", info.peer_id); if self .addr_mgr - .misbehave(session.address.clone(), Misbehavior::DuplicateListenAddrs) + .misbehave(&info.peer_id, Misbehavior::DuplicateListenAddrs) .is_disconnect() { service.disconnect(session.id); @@ -220,18 +232,19 @@ impl ServiceProtocol for IdentifyProtocol { } else if addrs.len() > MAX_ADDRS { if self .addr_mgr - .misbehave( - session.address.clone(), - Misbehavior::TooManyAddresses(addrs.len()), - ) + .misbehave(&info.peer_id, Misbehavior::TooManyAddresses(addrs.len())) .is_disconnect() { service.disconnect(session.id); } } else { trace!("received listen addresses: {:?}", addrs); + let multiaddr_addrs = addrs + .iter() + .filter_map(|addr| addr.to_multiaddr().ok()) + .collect::>(); self.addr_mgr - .add_listen_addrs(session.address.clone(), addrs.clone()); + .add_listen_addrs(&info.peer_id, multiaddr_addrs); info.listen_addrs = Some(addrs); } } @@ -240,7 +253,7 @@ impl ServiceProtocol for IdentifyProtocol { debug!("remote({:?}) repeat send listen addresses", info.peer_id); if self .addr_mgr - .misbehave(session.address.clone(), Misbehavior::DuplicateObservedAddr) + .misbehave(&info.peer_id, Misbehavior::DuplicateObservedAddr) .is_disconnect() { service.disconnect(session.id); @@ -249,8 +262,9 @@ impl ServiceProtocol for IdentifyProtocol { trace!("received observed address: {}", addr); info.observed_addr = Some(addr); // TODO how can we trust this address? - self.addr_mgr - .add_observed_addr(session.address.clone(), addr); + if let Ok(multiaddr) = addr.to_multiaddr() { + self.addr_mgr.add_observed_addr(&info.peer_id, multiaddr); + } self.observed_addrs.insert(info.peer_id.clone(), addr); } } @@ -261,7 +275,7 @@ impl ServiceProtocol for IdentifyProtocol { ); if self .addr_mgr - .misbehave(session.address.clone(), Misbehavior::InvalidData) + .misbehave(&info.peer_id, Misbehavior::InvalidData) .is_disconnect() { service.disconnect(session.id); @@ -271,14 +285,19 @@ impl ServiceProtocol for IdentifyProtocol { } fn notify(&mut self, service: &mut ServiceContext, _token: u64) { + if !self.secio_enabled { + return; + } + + let now = Instant::now(); for (session_id, info) in &self.remote_infos { if (info.listen_addrs.is_none() || info.observed_addr.is_none()) - && (info.connected_at + info.timeout) <= Instant::now() + && (info.connected_at + info.timeout) <= now { debug!("{:?} receive identify message timeout", info.peer_id); if self .addr_mgr - .misbehave(info.session.address.clone(), Misbehavior::Timeout) + .misbehave(&info.peer_id, Misbehavior::Timeout) .is_disconnect() { service.disconnect(*session_id); From 40a26ab614453f9481dec478320a20ca0d8b3710 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Mon, 11 Mar 2019 16:06:22 +0800 Subject: [PATCH 14/17] Fix rebase --- Cargo.toml | 2 +- examples/id.rs | 20 ++++++++++++-------- identify/Cargo.toml | 2 +- identify/src/lib.rs | 20 +------------------- 4 files changed, 15 insertions(+), 29 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d45f1297..2c18749c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ crossbeam-channel = "0.3.6" systemstat = "0.1.3" nix = "0.13.0" ping = { path = "ping", package = "tentacle-ping" } -identify = { path = "identify" } +identify = { path = "identify", package = "tentacle-identify" } generic-channel = { version = "0.2.0", features = ["all"] } [workspace] diff --git a/examples/id.rs b/examples/id.rs index bed879a0..8426f2dd 100644 --- a/examples/id.rs +++ b/examples/id.rs @@ -1,22 +1,26 @@ use env_logger; use log::debug; -use std::net::SocketAddr; use futures::{future::lazy, prelude::*}; use identify::{AddrManager, IdentifyProtocol, MisbehaveResult, Misbehavior}; use tentacle::{ - builder::ServiceBuilder, + builder::{MetaBuilder, ServiceBuilder}, context::ServiceContext, multiaddr::Multiaddr, - secio::SecioKeyPair, - service::{DialProtocol, ServiceError, ServiceEvent}, + secio::{PeerId, SecioKeyPair}, + service::{DialProtocol, ProtocolHandle, ServiceError, ServiceEvent}, traits::ServiceHandle, }; fn main() { env_logger::init(); let addr_mgr = SimpleAddrManager {}; - let protocol = IdentifyProtocol::new(1, addr_mgr); + let protocol = MetaBuilder::default() + .id(1) + .service_handle(move |meta| { + ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(meta.id(), addr_mgr.clone()))) + }) + .build(); if std::env::args().nth(1) == Some("server".to_string()) { debug!("Starting server ......"); let mut service = ServiceBuilder::default() @@ -51,11 +55,11 @@ struct SimpleAddrManager {} impl AddrManager for SimpleAddrManager { /// Add remote peer's listen addresses - fn add_listen_addrs(&mut self, _peer_addr: Multiaddr, _addrs: Vec) {} + fn add_listen_addrs(&mut self, _peer: &PeerId, _addrs: Vec) {} /// Add our address observed by remote peer - fn add_observed_addr(&mut self, _peer_addr: Multiaddr, _addr: SocketAddr) {} + fn add_observed_addr(&mut self, _peer: &PeerId, _addr: Multiaddr) {} /// Report misbehavior - fn misbehave(&mut self, _peer_addr: Multiaddr, _kind: Misbehavior) -> MisbehaveResult { + fn misbehave(&mut self, _peer: &PeerId, _kind: Misbehavior) -> MisbehaveResult { MisbehaveResult::Disconnect } } diff --git a/identify/Cargo.toml b/identify/Cargo.toml index 9498b875..eba924ba 100644 --- a/identify/Cargo.toml +++ b/identify/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "identify" +name = "tentacle-identify" version = "0.1.0" authors = ["Qian Linfeng "] edition = "2018" diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 7aa3dad1..d8b26874 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -13,11 +13,10 @@ use p2p::{ context::{ServiceContext, SessionContext}, multiaddr::{Multiaddr, ToMultiaddr}, secio::PeerId, - traits::{Codec, ProtocolHandle, ProtocolMeta, ServiceProtocol}, + traits::ServiceProtocol, utils::multiaddr_to_socketaddr, ProtocolId, SessionId, }; -use tokio::codec::length_delimited::LengthDelimitedCodec; use protocol::IdentifyMessage; @@ -131,23 +130,6 @@ impl RemoteInfo { } } -impl ProtocolMeta for IdentifyProtocol { - fn id(&self) -> ProtocolId { - self.id - } - - fn codec(&self) -> Box { - Box::new(LengthDelimitedCodec::new()) - } - - fn service_handle(&self) -> ProtocolHandle> { - ProtocolHandle::Callback(Box::new(IdentifyProtocol::new( - self.id, - self.addr_mgr.clone(), - ))) - } -} - impl ServiceProtocol for IdentifyProtocol { fn init(&mut self, service: &mut ServiceContext) { self.listen_addrs = service From 41a528131b2e125e57c3885d6d9ce51971f4b4d0 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Wed, 13 Mar 2019 11:39:04 +0800 Subject: [PATCH 15/17] Use add_observed_addr to get feedback from peerstore --- examples/id.rs | 4 +++- identify/src/lib.rs | 10 ++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/examples/id.rs b/examples/id.rs index 8426f2dd..6677f2fc 100644 --- a/examples/id.rs +++ b/examples/id.rs @@ -57,7 +57,9 @@ impl AddrManager for SimpleAddrManager { /// Add remote peer's listen addresses fn add_listen_addrs(&mut self, _peer: &PeerId, _addrs: Vec) {} /// Add our address observed by remote peer - fn add_observed_addr(&mut self, _peer: &PeerId, _addr: Multiaddr) {} + fn add_observed_addr(&mut self, _peer: &PeerId, _addr: Multiaddr) -> MisbehaveResult { + MisbehaveResult::Continue + } /// Report misbehavior fn misbehave(&mut self, _peer: &PeerId, _kind: Misbehavior) -> MisbehaveResult { MisbehaveResult::Disconnect diff --git a/identify/src/lib.rs b/identify/src/lib.rs index d8b26874..949bee56 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -68,7 +68,7 @@ pub trait AddrManager: Clone + Send { /// Add remote peer's listen addresses fn add_listen_addrs(&mut self, peer: &PeerId, addrs: Vec); /// Add our address observed by remote peer - fn add_observed_addr(&mut self, peer: &PeerId, addr: Multiaddr); + fn add_observed_addr(&mut self, peer: &PeerId, addr: Multiaddr) -> MisbehaveResult; /// Report misbehavior fn misbehave(&mut self, peer: &PeerId, kind: Misbehavior) -> MisbehaveResult; } @@ -245,7 +245,13 @@ impl ServiceProtocol for IdentifyProtocol { info.observed_addr = Some(addr); // TODO how can we trust this address? if let Ok(multiaddr) = addr.to_multiaddr() { - self.addr_mgr.add_observed_addr(&info.peer_id, multiaddr); + if self + .addr_mgr + .add_observed_addr(&info.peer_id, multiaddr) + .is_disconnect() + { + service.disconnect(session.id); + } } self.observed_addrs.insert(info.peer_id.clone(), addr); } From b0e70d3956bac936f89475f554f52f06b9037496 Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Wed, 13 Mar 2019 11:44:07 +0800 Subject: [PATCH 16/17] Fix identify example --- examples/id.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/id.rs b/examples/id.rs index 6677f2fc..519b66a7 100644 --- a/examples/id.rs +++ b/examples/id.rs @@ -17,8 +17,8 @@ fn main() { let addr_mgr = SimpleAddrManager {}; let protocol = MetaBuilder::default() .id(1) - .service_handle(move |meta| { - ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(meta.id(), addr_mgr.clone()))) + .service_handle(move || { + ProtocolHandle::Callback(Box::new(IdentifyProtocol::new(1, addr_mgr))) }) .build(); if std::env::args().nth(1) == Some("server".to_string()) { From 5c341eb3e5119303c0b845ad21262127d5bab111 Mon Sep 17 00:00:00 2001 From: Qian Linfeng Date: Thu, 14 Mar 2019 14:57:13 +0800 Subject: [PATCH 17/17] Change adddress format to multiaddr --- identify/src/lib.rs | 47 ++++++++++-------------------- identify/src/protocol.fbs | 3 +- identify/src/protocol.rs | 35 +++++----------------- identify/src/protocol_generated.rs | 28 +++++------------- 4 files changed, 33 insertions(+), 80 deletions(-) diff --git a/identify/src/lib.rs b/identify/src/lib.rs index 949bee56..90db2ab2 100644 --- a/identify/src/lib.rs +++ b/identify/src/lib.rs @@ -5,16 +5,14 @@ mod protocol_generated; mod protocol; use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; use std::time::{Duration, Instant}; use log::{debug, error, trace}; use p2p::{ context::{ServiceContext, SessionContext}, - multiaddr::{Multiaddr, ToMultiaddr}, + multiaddr::Multiaddr, secio::PeerId, traits::ServiceProtocol, - utils::multiaddr_to_socketaddr, ProtocolId, SessionId, }; @@ -77,8 +75,8 @@ pub trait AddrManager: Clone + Send { pub struct IdentifyProtocol { id: ProtocolId, addr_mgr: T, - listen_addrs: Vec, - observed_addrs: HashMap, + listen_addrs: Vec, + observed_addrs: HashMap, remote_infos: HashMap, secio_enabled: bool, } @@ -107,8 +105,8 @@ pub(crate) struct RemoteInfo { connected_at: Instant, timeout: Duration, - listen_addrs: Option>, - observed_addr: Option, + listen_addrs: Option>, + observed_addr: Option, } impl RemoteInfo { @@ -132,11 +130,7 @@ impl RemoteInfo { impl ServiceProtocol for IdentifyProtocol { fn init(&mut self, service: &mut ServiceContext) { - self.listen_addrs = service - .listens() - .iter() - .map(|addr| multiaddr_to_socketaddr(addr).unwrap()) - .collect(); + self.listen_addrs = service.listens().clone(); service.set_service_notify( self.id, @@ -161,7 +155,7 @@ impl ServiceProtocol for IdentifyProtocol { trace!("IdentifyProtocol sconnected from {:?}", remote_info.peer_id); self.remote_infos.insert(session.id, remote_info); - let listen_addrs: HashSet = self + let listen_addrs: HashSet = self .observed_addrs .values() .chain(self.listen_addrs.iter()) @@ -170,9 +164,7 @@ impl ServiceProtocol for IdentifyProtocol { .collect(); let data = IdentifyMessage::ListenAddrs(listen_addrs.into_iter().collect()).encode(); service.send_message(session.id, self.id, data); - let remote_addr = - multiaddr_to_socketaddr(&session.address).expect("Can not get remote address"); - let data = IdentifyMessage::ObservedAddr(remote_addr).encode(); + let data = IdentifyMessage::ObservedAddr(session.address.clone()).encode(); service.send_message(session.id, self.id, data); } @@ -221,12 +213,7 @@ impl ServiceProtocol for IdentifyProtocol { } } else { trace!("received listen addresses: {:?}", addrs); - let multiaddr_addrs = addrs - .iter() - .filter_map(|addr| addr.to_multiaddr().ok()) - .collect::>(); - self.addr_mgr - .add_listen_addrs(&info.peer_id, multiaddr_addrs); + self.addr_mgr.add_listen_addrs(&info.peer_id, addrs.clone()); info.listen_addrs = Some(addrs); } } @@ -242,16 +229,14 @@ impl ServiceProtocol for IdentifyProtocol { } } else { trace!("received observed address: {}", addr); - info.observed_addr = Some(addr); + info.observed_addr = Some(addr.clone()); // TODO how can we trust this address? - if let Ok(multiaddr) = addr.to_multiaddr() { - if self - .addr_mgr - .add_observed_addr(&info.peer_id, multiaddr) - .is_disconnect() - { - service.disconnect(session.id); - } + if self + .addr_mgr + .add_observed_addr(&info.peer_id, addr.clone()) + .is_disconnect() + { + service.disconnect(session.id); } self.observed_addrs.insert(info.peer_id.clone(), addr); } diff --git a/identify/src/protocol.fbs b/identify/src/protocol.fbs index 7be34f22..24d75c89 100644 --- a/identify/src/protocol.fbs +++ b/identify/src/protocol.fbs @@ -18,6 +18,5 @@ table ObservedAddr { } table Address { - ip: [ubyte]; - port: ushort; + bytes: [ubyte]; } diff --git a/identify/src/protocol.rs b/identify/src/protocol.rs index 94a52723..9925d63b 100644 --- a/identify/src/protocol.rs +++ b/identify/src/protocol.rs @@ -1,5 +1,3 @@ -use std::net::{IpAddr, SocketAddr}; - use flatbuffers::{get_root, FlatBufferBuilder, WIPOffset}; use crate::protocol_generated::p2p::identify::{ @@ -7,11 +5,12 @@ use crate::protocol_generated::p2p::identify::{ IdentifyMessageBuilder, IdentifyPayload as FbsIdentifyPayload, ListenAddrs as FbsListenAddrs, ListenAddrsBuilder, ObservedAddr as FbsObservedAddr, ObservedAddrBuilder, }; +use p2p::multiaddr::Multiaddr; #[derive(Clone, PartialEq, Eq, Debug)] pub enum IdentifyMessage { - ListenAddrs(Vec), - ObservedAddr(SocketAddr), + ListenAddrs(Vec), + ObservedAddr(Multiaddr), } impl IdentifyMessage { @@ -76,32 +75,14 @@ impl IdentifyMessage { fn addr_to_offset<'b>( fbb: &mut FlatBufferBuilder<'b>, - addr: &SocketAddr, + addr: &Multiaddr, ) -> WIPOffset> { - let ip = match addr.ip() { - IpAddr::V4(ipv4) => fbb.create_vector(&ipv4.octets()), - IpAddr::V6(ipv6) => fbb.create_vector(&ipv6.octets()), - }; + let bytes = fbb.create_vector(&addr.to_bytes()); let mut addr_builder = AddressBuilder::new(fbb); - addr_builder.add_ip(ip); - addr_builder.add_port(addr.port()); + addr_builder.add_bytes(bytes); addr_builder.finish() } -fn fbs_to_addr(addr: &FbsAddress) -> Option { - let ip_bytes = addr.ip()?; - let ip_addr = match ip_bytes.len() { - 4 => { - let mut data = [0u8; 4]; - data.copy_from_slice(ip_bytes); - Some(IpAddr::from(data)) - } - 16 => { - let mut data = [0u8; 16]; - data.copy_from_slice(ip_bytes); - Some(IpAddr::from(data)) - } - _ => None, - }; - ip_addr.map(|ip| SocketAddr::new(ip, addr.port())) +fn fbs_to_addr(addr: &FbsAddress) -> Option { + Multiaddr::from_bytes(addr.bytes()?.to_vec()).ok() } diff --git a/identify/src/protocol_generated.rs b/identify/src/protocol_generated.rs index 7b5657e7..3fb57f58 100644 --- a/identify/src/protocol_generated.rs +++ b/identify/src/protocol_generated.rs @@ -377,34 +377,26 @@ impl<'a> Address<'a> { _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, args: &'args AddressArgs<'args>) -> flatbuffers::WIPOffset> { let mut builder = AddressBuilder::new(_fbb); - if let Some(x) = args.ip { builder.add_ip(x); } - builder.add_port(args.port); + if let Some(x) = args.bytes { builder.add_bytes(x); } builder.finish() } - pub const VT_IP: flatbuffers::VOffsetT = 4; - pub const VT_PORT: flatbuffers::VOffsetT = 6; + pub const VT_BYTES: flatbuffers::VOffsetT = 4; #[inline] - pub fn ip(&self) -> Option<&'a [u8]> { - self._tab.get::>>(Address::VT_IP, None).map(|v| v.safe_slice()) - } - #[inline] - pub fn port(&self) -> u16 { - self._tab.get::(Address::VT_PORT, Some(0)).unwrap() + pub fn bytes(&self) -> Option<&'a [u8]> { + self._tab.get::>>(Address::VT_BYTES, None).map(|v| v.safe_slice()) } } pub struct AddressArgs<'a> { - pub ip: Option>>, - pub port: u16, + pub bytes: Option>>, } impl<'a> Default for AddressArgs<'a> { #[inline] fn default() -> Self { AddressArgs { - ip: None, - port: 0, + bytes: None, } } } @@ -414,12 +406,8 @@ pub struct AddressBuilder<'a: 'b, 'b> { } impl<'a: 'b, 'b> AddressBuilder<'a, 'b> { #[inline] - pub fn add_ip(&mut self, ip: flatbuffers::WIPOffset>) { - self.fbb_.push_slot_always::>(Address::VT_IP, ip); - } - #[inline] - pub fn add_port(&mut self, port: u16) { - self.fbb_.push_slot::(Address::VT_PORT, port, 0); + pub fn add_bytes(&mut self, bytes: flatbuffers::WIPOffset>) { + self.fbb_.push_slot_always::>(Address::VT_BYTES, bytes); } #[inline] pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> AddressBuilder<'a, 'b> {