Skip to content

Commit

Permalink
Add Repo::peer_state to find out about the sync state w.r.t a peer
Browse files Browse the repository at this point in the history
and doc

Problem: It's often useful to know what the remote state of a document
we are synchronizing is. We have a lot of information about this at the
automerge level but it's not exposed to users of `Repo`.

Solution: Expose automerge sync state information as well as keep track
of when we last received and sent messages.
  • Loading branch information
alexjg committed Nov 20, 2024
1 parent 9184ea6 commit 2b92e49
Show file tree
Hide file tree
Showing 5 changed files with 429 additions and 57 deletions.
13 changes: 13 additions & 0 deletions src/interfaces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use std::{
fmt::{Display, Formatter},
str::FromStr,
time::Instant,
};

#[derive(Debug, Eq, Hash, PartialEq, Clone)]
Expand Down Expand Up @@ -193,3 +194,15 @@ pub trait Storage: Send {
_full_doc: Vec<u8>,
) -> BoxFuture<'static, Result<(), StorageError>>;
}

/// The state of sycnhronization of a document with a remote peer obtained via [`RepoHandle::peer_state`](crate::RepoHandle::peer_state)
pub struct PeerState {
/// When we last received a message from this peer
pub last_received: Option<Instant>,
/// When we last sent a message to this peer
pub last_sent: Option<Instant>,
/// The heads of the document when we last sent a message
pub last_sent_heads: Option<Vec<automerge::ChangeHash>>,
/// The last heads of the document that the peer said they had
pub last_acked_heads: Option<Vec<automerge::ChangeHash>>,
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub use share_policy::{SharePolicy, SharePolicyError};

pub use crate::dochandle::DocHandle;
pub use crate::interfaces::{
DocumentId, Message, NetworkError, RepoId, RepoMessage, Storage, StorageError,
DocumentId, Message, NetworkError, PeerState, RepoId, RepoMessage, Storage, StorageError,
};
pub use crate::network_connect::ConnDirection;
pub use crate::repo::{Repo, RepoError, RepoHandle};
Expand Down
208 changes: 152 additions & 56 deletions src/repo.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::dochandle::{DocHandle, SharedDocument};
use crate::interfaces::{DocumentId, RepoId};
use crate::interfaces::{DocumentId, PeerState, RepoId};
use crate::interfaces::{NetworkError, RepoMessage, Storage, StorageError};
use crate::share_policy::ShareDecision;
use crate::{share_policy, SharePolicy, SharePolicyError};
Expand All @@ -21,6 +21,7 @@ use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Instant;
use uuid::Uuid;

/// Front-end of the repo.
Expand Down Expand Up @@ -210,11 +211,27 @@ impl RepoHandle {
})
.expect("Failed to send repo event.");
}

pub fn peer_state(
&self,
remote_id: RepoId,
document: DocumentId,
) -> RepoFuture<Option<PeerState>> {
let (fut, resolver) = new_repo_future_with_resolver();
self.repo_sender
.send(RepoEvent::GetPeerState {
remote_repo_id: remote_id,
document_id: document,
reply: resolver,
})
.expect("failed to send repo event");
fut
}
}

/// Events sent by repo or doc handles to the repo.
pub(crate) enum RepoEvent {
/// Start processing a new document.
/// Start processing a ew document.
NewDoc(DocumentId, DocumentInfo),
/// A document changed.
DocChange(DocumentId),
Expand All @@ -239,6 +256,11 @@ pub(crate) enum RepoEvent {
stream: Box<dyn Send + Unpin + Stream<Item = Result<RepoMessage, NetworkError>>>,
sink: Box<dyn Send + Unpin + Sink<RepoMessage, Error = NetworkError>>,
},
GetPeerState {
remote_repo_id: RepoId,
document_id: DocumentId,
reply: RepoFutureResolver<Option<PeerState>>,
},
/// Stop the repo.
Stop,
}
Expand All @@ -253,6 +275,7 @@ impl fmt::Debug for RepoEvent {
RepoEvent::LoadDoc(_, _) => f.write_str("RepoEvent::LoadDoc"),
RepoEvent::ListAllDocs(_) => f.write_str("RepoEvent::ListAllDocs"),
RepoEvent::ConnectRemoteRepo { .. } => f.write_str("RepoEvent::ConnectRemoteRepo"),
RepoEvent::GetPeerState { .. } => f.write_str("RepoEvent::GetPeerState"),
RepoEvent::Stop => f.write_str("RepoEvent::Stop"),
}
}
Expand Down Expand Up @@ -573,19 +596,32 @@ pub(crate) struct DocumentInfo {
last_heads: Vec<ChangeHash>,
}

/// A state machine representing a connection between a remote repo and a particular document
#[derive(Debug)]
enum PeerConnection {
/// we've accepted the peer and are syncing with them
Accepted(SyncState),
/// We're waiting for a response from the share policy
PendingAuth { received_messages: Vec<SyncMessage> },
struct PeerConnection {
repo_id: RepoId,
last_recv: Option<Instant>,
last_send: Option<Instant>,
state: PeerConnectionState,
}

impl PeerConnection {
fn pending() -> Self {
PeerConnection::PendingAuth {
received_messages: vec![],
fn pending(repo_id: RepoId) -> Self {
Self {
repo_id,
last_recv: None,
last_send: None,
state: PeerConnectionState::PendingAuth {
received_messages: vec![],
},
}
}

fn ready(repo_id: RepoId) -> Self {
Self {
repo_id,
last_recv: None,
last_send: None,
state: PeerConnectionState::Accepted(SyncState::new()),
}
}

Expand All @@ -594,23 +630,97 @@ impl PeerConnection {
doc: &mut Automerge,
msg: SyncMessage,
) -> Result<(), automerge::AutomergeError> {
match self {
PeerConnection::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg),
PeerConnection::PendingAuth { received_messages } => {
self.last_recv = Some(Instant::now());
match &mut self.state {
PeerConnectionState::Accepted(sync_state) => doc.receive_sync_message(sync_state, msg),
PeerConnectionState::PendingAuth { received_messages } => {
received_messages.push(msg);
Ok(())
}
}
}

fn generate_first_sync_message(
&mut self,
document: &mut automerge::Automerge,
) -> Option<SyncMessage> {
let msg = match &mut self.state {
PeerConnectionState::PendingAuth { received_messages } => {
let mut sync_state = SyncState::new();
for msg in received_messages.drain(..) {
document
.receive_sync_message(&mut sync_state, msg)
.expect("Failed to receive sync message.");
}
let message = document.generate_sync_message(&mut sync_state);
self.state = PeerConnectionState::Accepted(sync_state);
message
}
PeerConnectionState::Accepted(sync_state) => document.generate_sync_message(sync_state),
};
if msg.is_some() {
self.last_send = Some(Instant::now());
}
msg
}

fn generate_sync_message(&mut self, doc: &Automerge) -> Option<SyncMessage> {
match self {
Self::Accepted(sync_state) => doc.generate_sync_message(sync_state),
Self::PendingAuth { .. } => None,
let msg = match &mut self.state {
PeerConnectionState::Accepted(sync_state) => doc.generate_sync_message(sync_state),
PeerConnectionState::PendingAuth { .. } => None,
};
if msg.is_some() {
self.last_send = Some(Instant::now());
}
msg
}

fn promote_pending_peer(&mut self) -> Option<Vec<SyncMessage>> {
if let PeerConnectionState::PendingAuth { received_messages } = &mut self.state {
let result = std::mem::take(received_messages);
self.state = PeerConnectionState::Accepted(SyncState::new());
if !result.is_empty() {
self.last_send = Some(Instant::now());
}
Some(result)
} else {
tracing::warn!(remote=%self.repo_id, "Tried to promote a peer which was not pending authorization");
None
}
}

/// Get the state of synchronization with a remote peer and document
fn peer_state(&self) -> PeerState {
let last_sent_heads = match &self.state {
PeerConnectionState::Accepted(sync_state) => Some(sync_state.last_sent_heads.clone()),
PeerConnectionState::PendingAuth {
received_messages: _,
} => None,
};
let last_acked_heads = match &self.state {
PeerConnectionState::Accepted(sync_state) => Some(sync_state.shared_heads.clone()),
PeerConnectionState::PendingAuth {
received_messages: _,
} => None,
};
PeerState {
last_received: self.last_recv,
last_sent: self.last_send,
last_sent_heads,
last_acked_heads,
}
}
}

/// A state machine representing a connection between a remote repo and a particular document
#[derive(Debug)]
enum PeerConnectionState {
/// we've accepted the peer and are syncing with them
Accepted(SyncState),
/// We're waiting for a response from the share policy
PendingAuth { received_messages: Vec<SyncMessage> },
}

/// A change requested by a peer connection
enum PeerConnCommand {
/// Request authorization from the share policy
Expand Down Expand Up @@ -842,7 +952,7 @@ impl DocumentInfo {
Entry::Vacant(entry) => {
// if this is a new peer, request authorization
commands.push(PeerConnCommand::RequestAuth(repo_id.clone()));
entry.insert(PeerConnection::pending())
entry.insert(PeerConnection::pending(repo_id.clone()))
}
Entry::Occupied(entry) => entry.into_mut(),
};
Expand All @@ -861,48 +971,19 @@ impl DocumentInfo {
///
/// Returns any messages which the peer sent while we were waiting for authorization
fn promote_pending_peer(&mut self, repo_id: &RepoId) -> Option<Vec<SyncMessage>> {
if let Some(PeerConnection::PendingAuth { received_messages }) =
self.peer_connections.remove(repo_id)
{
self.peer_connections
.insert(repo_id.clone(), PeerConnection::Accepted(SyncState::new()));
Some(received_messages)
} else {
tracing::warn!(remote=%repo_id, "Tried to promote a peer which was not pending authorization");
None
}
self.peer_connections
.get_mut(repo_id)
.map(|c| c.promote_pending_peer())
.unwrap_or_default()
}

/// Potentially generate an outgoing sync message.
fn generate_first_sync_message(&mut self, repo_id: RepoId) -> Option<SyncMessage> {
match self.peer_connections.entry(repo_id) {
Entry::Vacant(entry) => {
let mut sync_state = SyncState::new();
let document = self.document.read();
let message = document.automerge.generate_sync_message(&mut sync_state);
entry.insert(PeerConnection::Accepted(sync_state));
message
}
Entry::Occupied(mut entry) => match entry.get_mut() {
PeerConnection::PendingAuth { received_messages } => {
let mut document = self.document.write();
let mut sync_state = SyncState::new();
for msg in received_messages.drain(..) {
document
.automerge
.receive_sync_message(&mut sync_state, msg)
.expect("Failed to receive sync message.");
}
let message = document.automerge.generate_sync_message(&mut sync_state);
entry.insert(PeerConnection::Accepted(sync_state));
message
}
PeerConnection::Accepted(ref mut sync_state) => {
let document = self.document.read();
document.automerge.generate_sync_message(sync_state)
}
},
}
let conn = self
.peer_connections
.entry(repo_id.clone())
.or_insert_with(|| PeerConnection::ready(repo_id));
conn.generate_first_sync_message(&mut self.document.write().automerge)
}

/// Generate outgoing sync message for all repos we are syncing with.
Expand All @@ -916,6 +997,10 @@ impl DocumentInfo {
})
.collect()
}

fn get_peer_state(&self, peer: &RepoId) -> Option<PeerState> {
self.peer_connections.get(peer).map(|p| p.peer_state())
}
}

/// Signal that the stream or sink on the network adapter is ready to be polled.
Expand Down Expand Up @@ -1522,6 +1607,17 @@ impl Repo {
self.sinks_to_poll.insert(repo_id.clone());
self.streams_to_poll.insert(repo_id);
}
RepoEvent::GetPeerState {
remote_repo_id,
document_id,
mut reply,
} => {
reply.resolve_fut(
self.documents
.get(&document_id)
.and_then(|info| info.get_peer_state(&remote_repo_id)),
);
}
RepoEvent::Stop => {
// Handled in the main run loop.
}
Expand Down
1 change: 1 addition & 0 deletions tests/network/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod document_list;
mod document_load;
mod document_request;
mod document_save;
mod peer_state;

use test_log::test;

Expand Down
Loading

0 comments on commit 2b92e49

Please sign in to comment.