Skip to content

Functional-style data processing parallelized in shared memory

License

Notifications You must be signed in to change notification settings

European-XFEL/pasha

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

32 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pasha

pasha (parallelized shared memory) provides tools to process data in parallel with an emphasis on shared memory and zero copy. It uses the map pattern similar to Python's builtin map() function, where a callable is applied to many elements in a collection. To avoid the high cost of IPC or other communication schemes, the results are meant to be written directly to memory shared between all workers as well as the calling site. The current implementations cover distribution across threads and processes on a single node.

Quick guide

To use it, simply import it, define your kernel function of choice and map away!

import numpy as np
import pasha as psh

# Get some random input data
inp = np.random.rand(100)

# Allocate output array via pasha. The returned array is
# guaranteed to be accessible from any worker, and may
# reside in shared memory.
outp = psh.alloc(like=inp)

# Define a kernel function multiplying each value with 3.
def triple_it(worker_id, index, value):
    outp[index] = 3 * value

# Map the kernel function.
psh.map(triple_it, inp)

# Check the result
np.testing.assert_allclose(outp, inp*3)

The runtime environment is controlled by a map context. The default context object is ProcessContext, which uses multiprocessing.Pool to distribute the work across several processes. This context only works on *nix systems supporting the fork() system call, as it expects any input data to be shared. When the process context is selected, psh.alloc() creates arrays in shared memory, so workers can write output data there and the caller can retrieve it with no memory copies.

You may either create an explicit context object and use it directly or change the default context, e.g.

psh.set_default_context('threads', num_workers=4)

There are three different context types builtin: serial, threads and processes.

The input array passed to map() is called a functor and is automatically wrapped in a suitable Functor object, here SequenceFunctor. This works for a number of common array and sequence types, but you may also implement your own Functor object to wrap anything else that can be iterated over.

For example, this is used to provide tight integration with EXtra-data, a toolkit used to access scientific data recorded at European XFEL. With this, you can map over DataCollection and KeyData objects to parallelize your data analysis.

def analysis_kernel(worker_id, index, train_id, data):
    # Do something with the data and save it to shared memory.

run = extra_data.open_run(proposal=700000, run=1)
psh.map(analysis_kernel, run[source, key])

About

Functional-style data processing parallelized in shared memory

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages