Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance when downloading files #57

Open
jeinwag opened this issue Mar 2, 2023 · 19 comments
Open

Performance when downloading files #57

jeinwag opened this issue Mar 2, 2023 · 19 comments

Comments

@jeinwag
Copy link

jeinwag commented Mar 2, 2023

I'm currently trying to use this crate in a small project for the purpose of downloading a file via sftp.
I don't know if I'm missing something, but with my naive implementation, downloading the file is several times slower than with the standard sftp client. What I'm doing is this:

use bytes::{BytesMut};
use openssh::{KnownHosts, Session, Stdio};
use openssh_sftp_client::Sftp;
use std::error::Error;
use std::fs::File;
use std::io::Write;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let session = Session::connect_mux(
        "ssh://user@host",
        KnownHosts::Accept,
    )
    .await?;
    let mut child = session
        .subsystem("sftp")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .spawn()
        .await?;

    let sftp_client = Sftp::new(
        child.stdin().take().unwrap(),
        child.stdout().take().unwrap(),
        Default::default(),
    )
    .await?;

    let mut zip_file = sftp_client
        .options()
        .read(true)
        .open("myfile")
        .await?;

    let mut tmp_file = File::create("myfile").unwrap();
    let buf_size = 100 * 1024 * 1024;

    while let Ok(read_result) = zip_file
        .read(
            buf_size,
            BytesMut::with_capacity(buf_size.try_into().unwrap()),
        )
        .await
    {
        if let Some(data) = read_result {
            tmp_file.write_all(&data)?;
        } else {
            break;
        }
    }

    Ok(())
}

Any hints are appreciated.

@jeinwag
Copy link
Author

jeinwag commented Mar 2, 2023

I just noticed that there's tokio::io::copy, but using it makes matters worse.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 3, 2023

Can you try creating the Sftp like this:

    let sftp_client = Sftp::new(
        child.stdin().take().unwrap(),
        child.stdout().take().unwrap(),
        SftpOptions::new().max_pending_requests(NonZeroU16::new(1).unwrap()),
    )
    .await?;

by default, Sftp tries to group requests and send them in batch to improve throughput, but in case where you only have one active requests anyway, setting it to flush immediately makes more sense.

@jeinwag
Copy link
Author

jeinwag commented Mar 3, 2023

That doesn't help, unfortunately.

To put things in perspective: When using the sftp command line client, I get transfer speeds of about 35-40 MB/s.
The Rust program's throughput is probably less than 5 MB/s.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 3, 2023

Can you try replacing the I/O loop with this, in additional to the SftpOptions configuration:

    let mut tmp_file = File::create("myfile").unwrap();
    let mut buf = BytesMut::with_capacity(100 * 1024 * 1024);

    while let Some(data) = zip_file
        .read(buf.capacity(), buf)
        .await
        .transpose() // Return Result<Option<_>, _>
        .unwrap()     // Return Option<_>
    {
        buf = tokio::task::spawn_blocking(move || {
            tmp_file.write_all(&data).unwrap();
            data
        }).await.unwrap();
        buf.clear();
    }

The problem I've seen with this loop:

  • BytesMut isn't reused, but instead allocates it from heap everytime
  • Writing to std::file::File could block, so you need to use tokio::task::spawn_blocking

@jeinwag
Copy link
Author

jeinwag commented Mar 3, 2023

I'm sorry, I can't get your suggested code to compile.

Please note I also tried using tokio::io::copy instead of my own loop, but it seems even slower.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 3, 2023

Please note I also tried using tokio::io::copy instead of my own loop, but it seems even slower.

Not sure that will even work considering you are using std::fs::File.

I'm sorry, I can't get your suggested code to compile.

Thanks, I've fixed the errors:

use std::{error::Error, fs::File, io::Write, num::NonZeroU16};

use bytes::{Bytes, BytesMut};
use openssh::{KnownHosts, Session, Stdio};
use openssh_sftp_client::{Sftp, SftpOptions};
use tokio::sync::mpsc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let session = Session::connect_mux("ssh://user@host", KnownHosts::Accept).await?;
    let mut child = session
        .subsystem("sftp")
        .stdin(Stdio::piped())
        .stdout(Stdio::piped())
        .spawn()
        .await?;

    let sftp_client = Sftp::new(
        child.stdin().take().unwrap(),
        child.stdout().take().unwrap(),
        SftpOptions::new().max_pending_requests(NonZeroU16::new(1).unwrap()),
    )
    .await?;

    let mut zip_file = sftp_client.options().read(true).open("myfile").await?;

    let (tx, mut rx) = mpsc::unbounded_channel::<Bytes>();
    let task = tokio::task::spawn_blocking(move || {
        let mut tmp_file = File::create("myfile")?;
        if let Some(data) = rx.blocking_recv() {
            tmp_file.write_all(&data)?;
        }

        tmp_file.flush()
    });

    let mut buf = BytesMut::with_capacity(8192);

    while let Some(data) = zip_file
        .read(1024, buf)
        .await
        .unwrap()
    {
        buf = data;
        // Reserve before splitting to increase the possibility of
        // existing buffer getting reused.
        buf.reserve(4096);

        if tx.send(buf.split().freeze()).is_err() {
            // The write task failed, break the loop
            // and retrieve the result of the task.
            break;
        }
    }

    task.await??;

    Ok(())
}

Edit:

I've made a few changes to the I/O loop to make it runs the file I/O and sftp in parallel, should be faster.

Plus I also adjust the bytes to read in the I/O loop since each tcp packet can be at most 64K large (jumbo packet) but in practice it is likely to be 1.5K, so setting the bytes to read to something so large doesn't any sense and would only make it slower since now sftp has to wait for several tcp packets and group them in memory instead of writing them out to file immediately.

@jeinwag
Copy link
Author

jeinwag commented Mar 3, 2023

Not sure that will even work considering you are using std::fs::File.

I did use openssh_sftp_client::file::TokioCompatFilewhen using tokio::io::copy

Thank you for the hints.
I tried your latest example, and unfortunately, it does not work for me. It only transfers 1 KB and then exits - looks like tx.send(buf.split().freeze()).is_err() returns true and therefore breaks the loop.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 3, 2023

I did use openssh_sftp_client::file::TokioCompatFilewhen using tokio::io::copy

I'm not referring to the source, but the destination which is std::fs::File.
I guess you probably use tokio::fs::File for this.

I tried your latest example, and unfortunately, it does not work for me. It only transfers 1 KB and then exits - looks like tx.send(buf.split().freeze()).is_err() returns true and therefore breaks the loop.

Hmmm if that is true, then it's possible writing to the file myfile somehow fails.

@jeinwag
Copy link
Author

jeinwag commented Mar 3, 2023

I guess you probably use tokio::fs::File for this.

Yes, I had to change it to use tokio::fs::File.

Hmmm if that is true, then it's possible writing to the file myfile somehow fails.

I modified the program to display the error message and all I'm getting is "channel closed", but I'm not sure why.

@jeinwag
Copy link
Author

jeinwag commented Mar 3, 2023

Ok, got it: if let Some(data) = rx.blocking_recv() { needs to be replaced with while let Some(data) = rx.blocking_recv() {

It does work now, but it still isn't faster.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 3, 2023

Ok, got it: if let Some(data) = rx.blocking_recv() { needs to be replaced with while let Some(data) = rx.blocking_recv() {

Aha sorry about this.

It does work now, but it still isn't faster.

There's no speedup at all?
I will have to look this up closer tomorrw, but did you compile the program with --release?
You could also turn on lto = true to improve performance.

If none of this help, then it could be the sftp implementation itself isn't very efficient, that's the only thing I could think of.

@jeinwag
Copy link
Author

jeinwag commented Mar 3, 2023

Still no performance improvement, unfortunately. I'm also thinking that it's some issue with the sftp implementation, but I know nothing aber sftp protocol internals.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 5, 2023

Still no performance improvement, unfortunately. I'm also thinking that it's some issue with the sftp implementation, but I know nothing aber sftp protocol internals.

Maybe you can do a profile and then use tools like cargo-flamegraph to print a flamegraph of the profile?
That will help me finding out the bottleneck.

@jeinwag
Copy link
Author

jeinwag commented Mar 6, 2023

Sure thing, here's a flamegraph generated during the download of a small 10 MB file. Please note that I had to add a drop(tx) to the code after the reading loop for the program to actually terminate.

flamegraph

@NobodyXu
Copy link
Member

NobodyXu commented Mar 6, 2023

@jeinwag Thanks, I will have a look later.

P.S. it will be great if I can zoom in/out

@jeinwag
Copy link
Author

jeinwag commented Mar 6, 2023

@NobodyXu apparently the required JavaScript execution is being blocked, but zooming should work if you download the SVG and open it.

@NobodyXu
Copy link
Member

NobodyXu commented Mar 6, 2023

Thanks, will download it and have a look.

@NobodyXu
Copy link
Member

NobodyXu commented Apr 5, 2023

@jeinwag Sorry I've been busy with other projects.

I took a look at the flamegraph and it seems that locking/syscalls actually take quite some time?

Maybe switching to a different kernel/fs would help?

I see that you are using ext4, maybe trying using tmpfs to see if it would be faster?

Also try updating to latest ssh and linux kernel, that might help.

@NobodyXu
Copy link
Member

NobodyXu commented May 2, 2023

@Xuanwo @silver-ymz Since you guys are using this crate in opendal, a crate designed for high-performance unified data access and is used in sccache, I wonder did you encounter any performance like the one shown in this issue?

If you did encounter that, feel free to send me more data/info so that I can fix it.

I will also welcome any patch that improve performance/ergonomic of this crate.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

2 participants