Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask task graphs are huge (non-performant) for large data #48

Open
GFleishman opened this issue Jul 19, 2023 · 11 comments
Open

Dask task graphs are huge (non-performant) for large data #48

GFleishman opened this issue Jul 19, 2023 · 11 comments

Comments

@GFleishman
Copy link

GFleishman commented Jul 19, 2023

I think this will be a tougher issue to solve than typos, math bugs, and formatting; so it will warrant some conversation with @thewtex and @tbirdso.

I have ngff-zarr running well on small in-memory sized datasets (e.g. 512x512x512 voxels). When formatted with the nested directory style dimension separator these files open just fine with the napari-ome-zarr plugin (just my first check for compatibility with other tools - I'll use other visualization software later as well). So my next task was to run on real data.

My real image is {'z': 1596, 'y': 8983, 'x': 5451} voxels and I am attempting the following:

ngff_image = to_ngff_image(
    my_big_zarr_array,
    dims=('z', 'y', 'x'),
    scale={a:b for a, b in zip('zyx', my_voxel_spacing)},
    axes_units={a:'micrometer' for a in 'zyx'},
)

multiscales = to_multiscales(
    ngff_image,
    scale_factors=[{'z':4, 'y':8, 'x':8}],
    chunks=(64, 128, 128),
)

to_ngff_zarr(
    './my_data_as_ome_ngff_zarr.zarr',
    'multiscales,
)

Importantly, this does actually work. If I wait long enough I get the file I want. However there is a performance issue that IMO will be lethal for the package if not resolved. For this (4, 8, 8) downsample level, the task graph has about 2.5M tasks. The dask distributed scheduler really can't handle graphs of this size well, which it acknowledges with a warning that the task graph itself is about 220MB when compressed, which it knows is huge, and says it will cause long slow downs.

It takes about 45 minutes for the scheduler to finish parsing all those tasks (just about right with the 2.5M tasks and the documented 0.001 seconds per task), and during this time the workers are allocated but idle. And this has to happen for every scale level. In a real scenario I might have 10 or more scale levels I want to create. So that's a huge amount of time to be paying for cpus that are just waiting for the scheduler to finish parsing the task graph. Making matters worse, this is actually only a small to medium size image. We could be running this on things that are much much bigger.

I feel that there are three different options for approaching this problem:
(1) small hacks, maybe something will help
(2) optimize the task graph by reworking the series of dask array operations to simplify simplify simplify
(3) abandon dask in as many places as possible

I tried three different things for (1):
(a) ran dask.array.optimize on the big graph (only eliminated about 10K tasks and at the expense of running this function which was long)
(b) forced even in-memory size scale levels to run through the serialization stuff (resulted in 6 task graphs each about 500K tasks, so the total time when breaking the problem up was even longer)
(c) forced every scale to run through this "Minimize task graph depth" block (just resulted in the same graph as before)

So now I want to decide between options (2) and (3). I'd love for there to be a solution with (2). It would be great to stay consistent with the sort of intended dask workflow and also to not have to refactor your package too much. But while building my package bigstream, and running into the same kind of huge task graph problems, I ultimately settled on (3) and everything got really performant really fast. After all, we're just doing a bunch of smoothing and downsampling here. It's not rocket science. Having all of that be consistent with dask.array and coordinated (or micromanaged) by a scheduler is nice, but not if it turns the program into a bureaucracy.

So, sorry for the long message, but I'd love to discuss this with you guys to determine where to go next.

@thewtex
Copy link
Owner

thewtex commented Jul 24, 2023

@GFleishman thanks for reporting this and the discussion!

You are right -- dask requires that the task graph it is fed is not too large, too deep, and contains a topology that can parallelize well to work with large data in practice.

I do have a local WIP branch that improves current situation. After complete, I would love to get your feedback on whether it helps with your issues and brainstorm together on what we can do better.

@GFleishman
Copy link
Author

@thewtex I'd be happy to help if you want to commit any of that branch. Your understanding of all the dask features is probably better than mine, since with bigstream I basically threw in the towel for a lot of these cases by avoiding the scheduler and having my functions load their own data/overlaps and things like that. But I can at least test in parallel with your development and try to optimize with you.

@toloudis
Copy link

Has there been any progress on this issue? I am now trying a similar sequence of functions starting with a dask delayed array that I am stacking together from several images. The total size is approximately T=500, Z=150, X=2000, Y=2000. My goal is to generate a zarr that is chunked per XY slice. In fact, ideally I want to chunk single slices at full res, then 4 slices when xy is scaled down by half, etc. But I'd settle for one chunk = one slice.

With the current code, I am hitting this error:

    multiscales = to_multiscales(
                  ^^^^^^^^^^^^^^^
RuntimeError: Cannot store into in memory Zarr Array using the distributed scheduler.

so in my case it's not even getting as far as starting to set up the downsampling task graph.

@thewtex
Copy link
Owner

thewtex commented Jan 24, 2024

Hi @toloudis , I am taking steps to first make generation faster, then come back to this.

That said, I think the existing code should be able to process your dataset.

RuntimeError: Cannot store into in-memory Zarr Array using the distributed scheduler.

Are you using the CLI? Or the API but writing into a zarr memory store? The CLI by default does not load the data into a zarr memory store. It incrementally writes to the disk according to what can be stored in memory. It will run in multiple stages, e.g., if your data is stored as 2000 by 2000 slices, it will first chunk the slice into smaller chunks, loading as many slices as possible without running out of memory, then create 3D chunks from the smaller pieces. The optimization I referenced in this issue does more of these incremental steps.

With the CLI, the flags --chunks flag enables specifying the chunk dimension, i.e. to specify XY chunks of a certain size. Also relevant are --memory-target, --local-cluster flags.

@toloudis
Copy link

I more or less ported what the cli is doing into my own script because the source data has to be assembled by python code.
I am using a LocalCluster very much the same as your code does. (Incidentally, your code has memory_target as a LocalCluster param and the correct name is memory_limit, at least when I tried it on my workstation)

Here's my code:

    ngff_image = to_ngff_image(data,
                               dims= ("t", "c", "z", "y", "x"),
                               scale = {"c":1, "t":1, "x":im3.physical_pixel_sizes.X, "y":im3.physical_pixel_sizes.Y, "z":im3.physical_pixel_sizes.Z},
                               translation = None,
                               name = "image",
                               axes_units = None
    )
    multiscales = to_multiscales(
        ngff_image,
        method=Methods.DASK_IMAGE_NEAREST,
        progress=rich_dask_progress,
        chunks=zarr_chunk_dims[0],
        cache= (ngff_image.data.nbytes > config.memory_target),
        scale_factors=[{"x":2, "y":2, "z":1, "t":1, "c":1}]
    )
    output_store = DirectoryStore(f"s3://{output_bucket}/{output_filename}.zarr", dimension_separator="/")
    to_ngff_zarr(
        output_store,
        multiscales,
        progress=rich_dask_progress
    )

My input data is a stacked dask delayed array cobbled together from several images.
The output chunks are intended to be of shape (t:1, c:1, z:1, ysize, xsize) which is one slice per chunk.

@thewtex
Copy link
Owner

thewtex commented Jan 25, 2024

Thanks for the details, looks good.

A couple things to try,

  • specify chunks in to_multiscales to your desired chunking, i.e. t, c, z are 1 -- currently, they are all zarr_chunk_dims[0]
  • a different method
  • not using LocalCluster

If it is a label image, #61 will (todo) be adding a ITKWASM_LABEL_IMAGE method that will have fewer artifacts and possibly better performance.

@thewtex
Copy link
Owner

thewtex commented Jan 25, 2024

Incidentally, your code has memory_target as a LocalCluster param and the correct name is memory_limit, at least when I tried it on my workstation)

I have a hard time finding this, at least with git grep memory_limit -- is there a file and line number?

@GFleishman
Copy link
Author

My two cents - I don't think this new discussion is related to the nominal issue of this thread (yet). @toloudis based on the RuntimeError you're getting it looks to me like dask thinks you want the reconstructed ome-ngff-zarr object returned in memory, instead of written to disk. Overall your array is bigger than the one I wrote this issue about - so even if you get past your RuntimeError you will still need to wait a long time for dask to parse the task graph before any workers actually start creating the new format data (though it also depends on the output chunk size, not just the input data size), but if you wait long enough it will work. I suggest you continue working with a LocalCluster, since that means you probably own the machine which is doing the work. That way you're not paying for cloud compute or cluster chargebacks while the dask scheduler does its long think.

I haven't used s3, but just wondering if your local machine is trying to gather all of the output data in memory before it's then written to the DirectoryStore?

@toloudis
Copy link

Regarding memory_target:
https://github.com/thewtex/ngff-zarr/blob/main/ngff_zarr/cli.py#L267
According to the error I got, memory_target is not a recognized kwarg. But memory_limit is. (see https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster)

And yes I agree my current issue is probably different than the original post here. Happy to move it to discussion or a new issue.

I am trying many different methods right now to make this work stably and fast. In particular, when I use ome-zarr-py (whose dask rescaling I actually worked on) I do have the very long initial stage of building a huge dask graph before the computation starts and it starts writing. I'm also experimenting with just writing my own code to loop over the large time series and serialize parts of the computation in my own way before handing things to dask. Currently not yet using all workers optimally but robustly working and flexible.

I also realized that zarr's DirectoryStore is not good for s3 writes -- FSStore is more appropriate. But haven't tried to re-run with that in place just yet.

@GFleishman
Copy link
Author

Ahh - I knew I recognized your avatar - it must have been from various ome repos I've lurked through.

...experimenting with just writing my own code to loop over the large time series and serialize parts of the computation in my own way before handing things to dask

That's the approach that I've taken with a lot of my dask work. The pain point I guess is that if the distributed package makes great improvements to graph parsing and this kind of overhead issue goes away, then I'll have to rewrite things back into the intended dask work flow, i.e. use collections without having to think about how they work under the hood. That's definitely possible, dask/distributed are great tools with great developer teams. One approach is to jump in and lend a hand and try to make improvements from within. But my job is more about delivering processed results to stake holders, so taking long periods to work on code infrastructure is hard to justify. So I work around when I have to work around; and often the work arounds are pretty simple.

For example, dask.array.map_overlap scales really poorly - for every block is spawns maybe ~100 tasks. So if you have a really large 3D or 4D dataset and you want to do an overlapping computation, you'll wait hours for the scheduler to parse the graph. On the other hand, just sending the coordinates of the block plus its overlaps to each worker and having the worker read data out of the store directly is just one task per block - super easy - and determining the block+overlap positions is simple.

@thewtex
Copy link
Owner

thewtex commented Jan 25, 2024

According to the error I got, memory_target is not a recognized kwarg. But memory_limit is. (see https://distributed.dask.org/en/stable/api.html#distributed.LocalCluster)

Thanks! Patched in #63

specify chunks in to_multiscales to your desired chunking, i.e. t, c, z are 1 -- currently, they are all zarr_chunk_dims[0]

I am interested to know whether this addresses the behavior?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants