Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor IPC API implementation #116

Merged
merged 22 commits into from
Jun 24, 2024

Conversation

Dampfwalze
Copy link
Contributor

@Dampfwalze Dampfwalze commented Apr 13, 2024

This is my go on the IPC API.

The driver events are now expressed by a Stream. This also allowes for multiple listeners.

Using the sync API, one can now add many receiver callbacks. The termination is now expressed using a boolean return value.

driver.add_event_receiver(|cmd| {
  ...
  if finished {
    false
  } else { 
    true 
  };
});

Maybe a subscription object is better?

let subscription = driver.add_event_receiver(|cmd| { ... });
...
subscription.cancel();

(Changes are not fully tested)

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 13, 2024

This was the last remaining thing on my todo list to handle properly. This is actually the kind of implementation I was looking for, but didn't get around to yet.

I would prefer a subscription object instead, but yeah, this is overall looking quite good!

Copy link
Owner

@MolotovCherry MolotovCherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love where this concept is going. This implementation looks good. Thanks for the contribution!

P.S. The build is failing because of the python bindings. You can ignore the build failure.

rust/driver-ipc/src/driver_client.rs Outdated Show resolved Hide resolved
rust/driver-ipc/src/client.rs Outdated Show resolved Hide resolved
rust/driver-ipc/src/client.rs Show resolved Hide resolved
rust/driver-ipc/src/client.rs Outdated Show resolved Hide resolved
rust/driver-ipc/src/client.rs Outdated Show resolved Hide resolved
rust/driver-ipc/src/sync/client.rs Outdated Show resolved Hide resolved
@Dampfwalze
Copy link
Contributor Author

I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method Client::revert() or Client::revert_to_persistant() that will load back the saved state from the registry.

@MolotovCherry
Copy link
Owner

I was thinking about the possibility to easily revert the drivers state to the persistant state. There could be a method Client::revert() or Client::revert_to_persistant() that will load back the saved state from the registry.

That's a good suggestion.

What do you think about having the method fetch and convert the persistent state to Vec<Monitor> and let the caller do what they want with it? We could keep the one that also reverts the driver in one call as well if it's still desirable

@Dampfwalze
Copy link
Contributor Author

What do you think about having the method fetch and convert the persistent state to Vec<Monitor> and let the caller do what they want with it? We could keep the one that also reverts the driver in one call as well if it's still desirable

In any case, there could be a method on the Client that deserializes the persistend state: Client::read_persistent() -> Vec<Monitor> (also forwarded in DriverClient?).

Then, there can be a method on the DriverClient to read this state and write it in its own state: DriverClient::load_persiststent().

But Client::revert_to_persistant() is still good to have (also forwarded in DriverClient).

@MolotovCherry
Copy link
Owner

In any case, there could be a method on the Client that deserializes the persistend state: Client::read_persistent() -> Vec<Monitor> (also forwarded in DriverClient?).

Then, there can be a method on the DriverClient to read this state and write it in its own state: DriverClient::load_persiststent().

But Client::revert_to_persistant() is still good to have (also forwarded in DriverClient).

That sounds perfect. I'll add that after this pr is merged

@MolotovCherry MolotovCherry mentioned this pull request Apr 14, 2024
2 tasks
@Dampfwalze
Copy link
Contributor Author

Dampfwalze commented Apr 14, 2024

I would prefer a subscription object instead

I have implemented this now.

One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Arc<Mutex<Option<EventsSubscription>>>.

Example:

let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));

let sub = client.add_event_receiver({
    let shared_sub = shared_sub.clone();
    move |event| {
        println!("{:?}", event);
        // ...
        shared_sub.lock().unwrap().as_mut().unwrap().cancel();
    }
});

*shared_sub.lock().unwrap() = Some(sub);

Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

I would prefer a subscription object instead

I have implemented this now.

One thing to note, there is no good way to cancel the subscription from the callback itself now. Doing this will involve creating some shared state where the subscription is moved into after subscribing, so something like Arc<Mutex<Option<EventsSubscription>>>.

Example:

let shared_sub = Arc::new(Mutex::new(None::<EventsSubscription>));

let sub = client.add_event_receiver({
    let shared_sub = shared_sub.clone();
    move |event| {
        println!("{:?}", event);
        // ...
        shared_sub.lock().unwrap().as_mut().unwrap().cancel();
    }
});

*shared_sub.lock().unwrap() = Some(sub);

Also: What should happen if the callback panics? Currently the task would just stop without notice. Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

I don't feel it matters much here if the callback panics. It does print a message, and the task exiting on panic is expected (similar to what would happen if a thread panicked). Though a note could be added if there's anything we want the user to be clear about. Do you feel it would be useful to catch unwind, mark the panic, resume unwind, and notify on cancel?

Perhaps it might be better to just catch it and re-raise it on the main thread instead. This way it becomes a lot clearer since everything crashes, instead of silently failing.

I just did some testing and found that the cb isn't getting executed. The streams are receiving as expected, it seems to stop working at RUNTIME.spawn in EventsSubscription. A block_on does execute it all the way (just a test to see if that was the issue), but spawn seems to not

@Dampfwalze
Copy link
Contributor Author

Dampfwalze commented Apr 14, 2024

I just did some testing and found that the cb isn't getting executed. The streams are receiving as expected, it seems to stop working at RUNTIME.spawn in EventsSubscription. A block_on does execute it all the way (just a test to see if that was the issue), but spawn seems to not

This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? RUNTIME.spawn() does not immediately start execution, it just sends it of to be scheduled for execution.

You can try tokio::task::yield_now().await after calling add_event_receiver() to give execution back to tokio.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

This is wired, because all unit tests pass and they do check if the callback is run or not. I need a bit more context to make sense of the situation here. Maybe the runtime was blocked somehow in your tests? Did you call the API from within the runtime? RUNTIME.spawn() does not immediately start execution, it just sends it of to be scheduled for execution.

You can try tokio::task::yield_now().await after calling add_event_receiver() to give execution back to tokio.

So far from what I've been able to gather, the test event_receiver_cancel_from_cb() still passes after a panic is introduced into the callback. The first one (event_receiver()) properly fails when doing the same thing, so it applies only to this test. Hmm

let sub = client.add_event_receiver({
      let shared_sub = shared_sub.clone();
      move |event| {
          panic!();
      }
  });

@Dampfwalze
Copy link
Contributor Author

Dampfwalze commented Apr 14, 2024

So far from what I've been able to gather, the test event_receiver_cancel_from_cb() still passes after a panic is introduced into the callback. The first one (event_receiver()) properly fails when doing the same thing, so it applies only to this test. Hmm

Yes, this is correct. This makes sense, because when the callback panics, the receiver used to cancel the callback is dropped and closed. If you now call cancel(), it returns false, which is actually the expected behaviour, because the callback was already "canceled".

But when introducing a print statement, I can confirm that it is run.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

Give me a moment to track this original problem down. I'll reply back when I have more context.

On a side note though, I do agree that it might be best to handle panics somehow to make it more transparent. You asked earlier:

Maybe EventSubscription::cancel() could return a Result? What would the Error type be then?

We could just use std::thread::Result, which is the same type that catch unwind returns. I'm leaning more into not panicking the whole program, so cancel seems like as good a place as any to return it I guess. It shouldn't be too much of a requirement that the cb is UnwindSafe, right? If there's a nicer way, I might be more preferable to that. I'd still like to avoid a catch unwind if possible.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

If you'd like to test what I was talking about earlier, here's how (below). (Please see Edit 2)

Edit: Since the before steps were a bit roundabout, use this instead. This triggers it just the same. (The async version works fine)

use driver_ipc::sync::DriverClient;

fn main() {
    let driver = DriverClient::new().unwrap();
    driver.add_event_receiver(|event| {
        // this does not print
        println!("{event:?}");
    });

    // go and send some events on the driver
    std::thread::sleep(std::time::Duration::from_secs(10));
    println!("shutting down");
}

Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit.

Everything looking good! 👍🏻

@Dampfwalze
Copy link
Contributor Author

Edit 2: Okay I think we got it now. Not exactly surprising (obviously, give the behavior of receivers), but we need to add a comment explaining that the returned value needs to be saved and not dropped until done, otherwise the callback will stop firing. Not binding the value to something will also cause a drop before end of scope. Though obvious, it's an easy gotcha people may accidentally hit.

This is not the behavior I expected nor wanted. I tracked that issue down and fixed it. If the subscription gets dropped, the callback will still run. This behavior is in line with the behavior of thread::spawn or task::spawn. When you drop the JoinHandle, the thread/task will be detached.

@MolotovCherry
Copy link
Owner

MolotovCherry commented Apr 14, 2024

The last remaining unresolved comment is basically already taken care of, isn't it?

Basically, what I'm envisioning here is that read errors out, but it's then confusing to the library user on why they aren't receiving any messages (e.g. on their event stream).

As far as this, I can take care of handling the error and a callback notification system of sorts / letting user know they got disconnected.

Maybe sometime we can handle re-connections after the server disconnected, but we don't need to think about that right now (it's not like the client can't just recreate the driver instance anyways).

This is a non-issue unrelated to this PR (not sure if it matters to bother handling this, but even if I do, it's unrelated to this PR)

The senders should be fine since the caller will receive errors if they try to send, but since we still hold onto a broadcast sender, the subscribers won't receive a None when the stream should be closed.

I'm thinking that we should completely drop the broadcast sender on error to solve this.

Iirc, this is already fixed after you changed the Client to hold a receiver instead.


So I think the previous one can be marked as resolved and we can merge this if you feel it's ready. (If you still have some api input about the design of it, your thoughts are welcome, but I don't see this last one as a blocker)

@Dampfwalze Dampfwalze marked this pull request as ready for review June 24, 2024 21:07
@MolotovCherry
Copy link
Owner

MolotovCherry commented Jun 24, 2024

Lgtm! If you feel it's good to go, I'm fine merging it right now as-is. If you want to do any touch-ups, it can be done later in smaller PR's (this would make it much easier to keep track of things too).

@MolotovCherry MolotovCherry merged commit ab9d5a2 into MolotovCherry:master Jun 24, 2024
2 of 3 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Jun 24, 2024
@MolotovCherry
Copy link
Owner

Merged now. Thank you for the time you put into this contribution! <3

Repository owner unlocked this conversation Jun 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants