Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dask to distribute particles spatially? #61

Open
angus-g opened this issue May 28, 2020 · 2 comments · May be fixed by #64
Open

Use dask to distribute particles spatially? #61

angus-g opened this issue May 28, 2020 · 2 comments · May be fixed by #64
Labels
question Further information is requested

Comments

@angus-g
Copy link
Owner

angus-g commented May 28, 2020

Parcels went with MPI for parallelisation, but this makes things pretty hard for us when performing advection. Previously, we would parallelise advection with OpenMP, so the workflow looked like:

  1. run main script as a single thread
  2. run advection, which automatically parallelises across available threads
  3. combine results into a dask array, so filtering can split across workers on available resources

However, with MPI, the workflow becomes more difficult. We have to run the main script on all available processors. Particle distribution then becomes automatic through Parcels, which detects that we're running in MPI and does a K-means distribution. This then means we have to write essentially arbitrary particles, distributed across all the processors back to a single file in a coherent way -- we could leverage MPI to gather all particle data back to a single processor for writing.

An alternative, especially because we're already using dask, may be to leverage dask for the whole process. See for example this notebook: https://nbviewer.jupyter.org/gist/willirath/6b5c4654ca6be3774fa76acf4a266b96

@angus-g angus-g added the question Further information is requested label May 28, 2020
@angus-g
Copy link
Owner Author

angus-g commented Jun 2, 2020

Doing this can get a little tricky in terms of locks and synchronisation. I've been experimenting with creating a single FieldSet from a chunked dataset (also just because this gets around Parcels' opinionated chunking). Using the default scheduling, or an explicitly thread-based scheduler (because we can't serialise something across processes), we can frame sampling as a gufunc over particle positions:

chunks = data_in.UVEL.chunks[1:]
seeds = da.from_array(np.dstack(f._orig_grid), chunks + (None,), name="seeds")

@da.as_gufunc(signature="(i),()->()", output_dtypes=float)
def sample_section(arr, time):
    """Given an array (X x Y x 2) of lon and lat (indexed on the third dimension),
    seed and sample particles corresponding to those positions, at a given time."""

    shape = arr.shape[:2]
    ps = parcels.ParticleSet(
            f.fieldset,
            pclass=f.particleclass,
            lon=arr[...,0],
            lat=arr[...,1],
            time=time,
    )
    # execute the sample-only kernel to efficiently grab the initial condition
    ps.kernel = f.sample_kernel
    ps.execute(f.sample_kernel, runtime=0, dt=0)

    return ps.particle_data["var_U"].reshape(*shape)

Still a little way to go on this, maybe to encapsulate the whole filtering process as an operation over a fieldset and an array of particle locations, returning a dataset of sampled and filtered data, then we can let the scheduler distribute this work according to the chunked particle locations (assuming we don't hit a deadlock somewhere)

@angus-g
Copy link
Owner Author

angus-g commented Jun 29, 2020

We can use the same seeding strategy as above, to get the seeds in a chunked dask array. Define a function to process a block, like:

def process_block(time, block, block_info=None):
    # this reads our input data as an xarray dataset
    data_in = load_data()
    f = create_filter(data_in) # we can't share fields/grids/etc. between workers

    # advection for a single block should be single-threaded
    # otherwise we get some weird interactions or too much communication
    # between workers
    with dask.config.set(scheduler="single-threaded"):
        out_data = advect_block(f, time, block)

    # we need to get the indices of this block within the seed array
    # so that we can combine everything with xr.combine_by_coords afterwards
    # (this is mostly a provision for curvilinear grids)
    return filter_block(f, block, block_info[1]["array-location"][:2], out_data._vars)

This uses the filter_block function, which replicates a lot of the internal functionality provided by the library, but combines the results for all sampled variables for a single block into a single dataset. This way, we can glue all the blocks back together.

def filter_block(f, block, block_idx, advection_data):
    if f.inertial_filter is None:
        f.inertial_filter = f._create_filter()

    # use indices of the original block to label coordinates
    # so we can use xarray.combine_by_coords
    ds_out = xr.Dataset(
        {},
        coords={
            "lon": (["x", "y"], block[..., 0]),
            "lat": (["x", "y"], block[..., 1]),
            "x": range(*block_idx[0]),
            "y": range(*block_idx[1]),
        }
    )

    def filter_select(x):
        ti = x.shape[1] // 2

        if f._min_window is not None:
            xn = np.isnan(x)
            is_valid = np.count_nonzero(~xn, axis=-1) >= f._min_window

            fwd = x[:, ti:]
            bkd = x[:, :ti+1]
            fwd = bn.push(fwd)
            bkd = bn.push(bkd[:, ::-1])[::-1]

            x = np.hstack((bkd[:, :-1], fwd))
            x[~is_valid] = np.nan

        return signal.filtfilt(*f.inertial_filter, x[..., ti])

    for v, x in advection_data.items():
        ds_out[v] = (["x", "y"], filter_select(x).reshape(ds_out.x.size, ds_out.y.size))

    # present an array to dask so it can concat properly
    x_out = np.empty((1,1), dtype=np.object)
    x_out[0,0] = ds_out
    return x_out

With these pieces, the only thing left to do is to map our process_block function over the seed array. To take a small subset:

ds_out = da.map_blocks(process_block, t, seeds.blocks[:2,:2], dtype=object, drop_axis=2, chunks=(1,1))
ds_out = ds_out.compute(scheduler="processes", num_workers=4)
ds_out = xr.combine_by_coords(ds_out.flatten())

Notes

In this case, we're providing our input data as an xarray Dataset. This lets us chunk it correctly (and do other on-the-fly modifications like mask land). To work efficiently, we don't want to load the entire dataset at once, but parcels thinks that we should. Making the following changes treats our input as a "deferred grid", which only loads 3 timesteps at once, and also does spatial chunking.

diff --git a/parcels/field.py b/parcels/field.py
index 1f02822a..92d85435 100644
--- a/parcels/field.py
+++ b/parcels/field.py
@@ -436,7 +436,7 @@ class Field(object):
 
     @classmethod
     def from_xarray(cls, da, name, dimensions, mesh='spherical', allow_time_extrapolation=None,
-                    time_periodic=False, **kwargs):
+                    time_periodic=False, deferred_load=False, **kwargs):
         """Create field from xarray Variable
 
         :param da: Xarray DataArray
@@ -467,6 +467,11 @@ class Field(object):
         time = time_origin.reltime(time)
 
         grid = Grid.create_grid(lon, lat, depth, time, time_origin=time_origin, mesh=mesh)
+
+        if deferred_load:
+            grid.defer_load = True
+            grid.ti = -1
+
         return cls(name, data, grid=grid, allow_time_extrapolation=allow_time_extrapolation,
                    interp_method=interp_method, **kwargs)
 
@@ -1089,9 +1094,9 @@ class Field(object):
         # 3: is loaded
         if isinstance(self.data, da.core.Array):
             for block_id in range(len(self.grid.load_chunk)):
-                if self.grid.load_chunk[block_id] == 1 or self.grid.load_chunk[block_id] > 1 and self.data_chunks[block_id] is None:
+                if self.grid.load_chunk[block_id] >= 1 or self.grid.load_chunk[block_id] > 1 and self.data_chunks[block_id] is None:
                     block = self.get_block(block_id)
-                    self.data_chunks[block_id] = np.array(self.data.blocks[(slice(self.grid.tdim),) + block])
+                    self.data_chunks[block_id] = np.array(self.data.blocks[(slice(self.grid.ti, self.grid.ti + self.grid.tdim),) + block])
                 elif self.grid.load_chunk[block_id] == 0:
                     if isinstance(self.data_chunks, list):
                         self.data_chunks[block_id] = None
@@ -2201,7 +2206,7 @@ class NetcdfFileBuffer(object):
                             self.chunking_finalized = True
                     else:
                         # ==== I think this can be "pass" too ==== #
-                        data = data.rechunk(self.chunk_mapping)
+                        #data = data.rechunk(self.chunk_mapping)
                         self.chunking_finalized = True
             else:
                 da_data = da.from_array(data, chunks=self.field_chunksize)

We do need to reset the chunk status between the forward and backward advection passes:

for g in f.fieldset.gridset.grids:
        g.load_chunk[:] = 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant