Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement join_all and try_join_all on JoinSet #6664

Open
FalkWoldmann opened this issue Jun 27, 2024 · 3 comments
Open

Implement join_all and try_join_all on JoinSet #6664

FalkWoldmann opened this issue Jun 27, 2024 · 3 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-task Module: tokio/task

Comments

@FalkWoldmann
Copy link

FalkWoldmann commented Jun 27, 2024

Is your feature request related to a problem? Please describe.
The futures crate currently provides two very convenient functions for awaiting an Iterator of futures with join_all and try_join_all. However, these functions are currently missing in JoinSet, requiring us to loop over the set manually.

Describe the solution you'd like
I would like to see the addition of the aforementioned methods to JoinSet, as it would make its usage more ergonomic.

Additional context
I am not quite sure what the best method definition is, since every task spawned by set the set could potentially fail with a JoinError. Here are some ideas I came up with:

join_all

Conservative

async fn join_all(mut self) -> Vec<Result<T, JoinError>>

This is the least opinionated approach, but also not that useful in my opinion. Granted, users could iterate again over the Vec, but then I am not really convinced by its utility over just constructing the initial while loop.

Filtered out JoinErrors

async fn join_all(mut self) -> Vec<T>

We could also just include all Okvariants, but this could be confusing because erroneous tasks are just silently ignored.

Opinionated join_all

async fn try_join_all(mut self) -> Result<Vec<T>, JoinError>

Just short-circuit as soon as one task failed. This is a lot more useful, but it could be quite confusing for the user, since it could also be understood as try_join_all

try_join_all

impl<T: 'static, K: 'static> JoinSet<Result<T, K>> {
    async fn try_join_all(mut self) -> Result<Vec<T>, TryJoinAllError<K>> {}
}

I would also like to have a try_join_all that reacts on some user provided Result, where TryJoinAllError Could be either K or JoinError.

In addition, I am not quite sure how do handle the remaining tasks in case of a panic or Error. Should we explicitly abort them, or just let them run? I would like to hear your thoughts on this.

@FalkWoldmann FalkWoldmann added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Jun 27, 2024
@SmnTin
Copy link

SmnTin commented Jun 28, 2024

join_all

  • Conservative. I agree with the reasoning against it. Its only benefit is that iterator combinators can be used on the returned vector. However, given that the try_{*} family of methods is still mostly unstable, this won't unlock any ergonomic opportunity. What's more, even with those methods, a better way, I believe, is to implement the Stream trait on JoinSet.
  • Filtered errors. This method should at least panic if an error is encountered. Otherwise, errors might easily go unnoticed, which is against the spirit of Rust!
  • Opinianated. I love this version the most. Again, if a user needs to act on each error, they can construct the manual loop.

try_join_all

Maybe return a nested result like Result<Result<Vec<T>, K>, JoinError> because one of the errors is "more local"?

The idea of JoinSet

In my opinion, JoinSet is the main tokio's primitive for structured concurrency:

  • It automatically aborts all its tasks on drop.
  • It allows to monitor for errors in child tasks during the execution.

I often find myself constructing the following loop to report errors and panics properly:

while let Some(join_result) = join_set.join_next().await {
    match join_result {
        Ok(result) => result?,
        Err(err) => panic::resume_unwind(err.into_panic()),
    }
}

This snippet assumes the tasks are not canceled except on join_set's drop.

With the proposed try_join_all, this can be rewritten as:

match join_set.try_join_all().await {
    Ok(_) => Ok(()),
    Err(TryJoinError::Err(err)) => return Err(err),
    Err(TryJoinError::JoinError(err)) => panic::resume_unwind(err.into_panic()),
}

It better conveys the idea but could be more concise.

@SmnTin
Copy link

SmnTin commented Jun 28, 2024

Regarding this question:

In addition, I am not quite sure how to handle the remaining tasks in case of a panic or Error. Should we explicitly abort them, or just let them run? I would like to hear your thoughts on this.

All the methods you proposed consume the JoinSet, meaning it will be dropped, and the tasks will be aborted. As I have elaborated above, I like this behavior.

But I can see the confusion between the proposed try_join_all, which joins all the tasks but a task might return an error, and the existing try_join, which tries to join a finished task if there are any, and returns None, otherwise.

@Darksonn Darksonn added the M-task Module: tokio/task label Jun 28, 2024
@Darksonn
Copy link
Contributor

Darksonn commented Jul 4, 2024

Hmm, I think it would make sense to have an opinionated join_all. It could even do something like this:

pub fn join_all(self) -> Vec<T> {
    let mut output = Vec::new();
    
    while let Some(out) = self.join_next().await {
        match out {
            Ok(res) => output.push(res),
            Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
            Err(err) => panic!("{err}"),
        }
    }
    
    output
}

Since this takes self, all tasks are cancelled on error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-task Module: tokio/task
Projects
None yet
Development

No branches or pull requests

3 participants