-
Notifications
You must be signed in to change notification settings - Fork 912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Implement IPC for pyspark. #11564
base: branch-22.10
Are you sure you want to change the base?
Conversation
{ | ||
if (res != CUDA_SUCCESS) { | ||
char const* msg; | ||
if (cuGetErrorString(res, &msg) != CUDA_SUCCESS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI, calls to the CUDA driver library will have to be via dlopen after #11370.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reference, will look into creating a dlopen wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a little bit trickier than I thought. With dlopen, I need to initialize the cuda context myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to wrap my head around the big picture of why this code is needed in the first place.
There is a lot of new new code here for IPC, but we've been supporting CUDA IPC for years already in Pyhon/UCX without needing any of the code here. So what has changed?
This is for an optimization of pyspark user defined function. When running UDF, spark needs to copy the data from device to host and from java process to Python process. But they are using the same device hence we want to bypass all the copies to save as much memory and time as possible. Ideally we want to patch arrow to make it do the job for handling these requirements, but it involves a long chain of actions (DF, arrow table, batches, cuda buffer, IPC handles, then another sequence of chunked readers) and a couple of data copies to achieve the transfer. Also, arrow GPU is required. |
Nothing about Arrow should require data copies here unless you want to use the Arrow IPC format. A libcudf table should be able to be reinterpreted as an Arrow RecordBatch zero copy. Regardless, if you want to IPC each buffer separately, it doesn't matter if you have a libcudf table or an Arrow RecordBatch as there isn't existing machinery to do this nor does it probably make sense as the overhead of opening all of the IPC handles to different buffers (can't assume you have a memory pool or all buffers are backed by a single allocation in a pool) would be pretty significant. |
Hi @trivialfis - before making further progress here, could we please conclude the discussion in #11514? My sense is that UCX can help abstract away some of the details relating to CUDA IPC, and could present an alternative solution to supporting IPC with pyspark. If not, we can always come back to this PR and work to get it merged. |
@kkraus14 Thank you for the comment.
Yes, I want to use the IPC format along with arrow functions to read and write it. This way I can avoid reinventing the wheel as arrow is the underlying format used by spark/pyspark during transfer.
The problem is mostly memory consumption and device <-> host memory copy. From benchmark results by our spark team, the transfer time can be quite significant (Data size = 36 million rows, 37 columns of ints and floats (4 bytes per entry) with 70 to 80 seconds). In comparison, using CUDA IPC handles are much cheaper and scales better in terms of data size. As for the memory usage concern, that's mostly for using pyspark with XGBoost. Over the past, the memory usage question has been the primary issue we are trying to address. As a result, every single removed data copy is deeply appreciated. |
UCX is a pretty large requirement for what really is the transfer of metadata that is opaque to cuDF and a single IPC handle. In addition to that, it imposes requirements and extra configs on the user, especially in java-land where we have to pass environment variables to make sure it won't crash when loading (we need to set UCX_ERROR_SIGNALS to empty or else it will crash since the JVM likes to use signals and UCX's error handlers catch those signals and can lead to segfault pretty regularly). Maybe I am confused, but shouldn't this code call @trivialfis did you try this already? It also means that we shouldn't need to change cuDF at all ideally, the thing that sends/receives this metadata buffer and the serialized ipc handle treat them as opaque data. |
@wbo4958 tried pack and unpack without RMM fix. I went with this per-column IPC to avoid data concatenation and copy due to memory constraint with XGBoost (a ml library that requires full data being available on GPU memory). |
This PR has been labeled |
Issue: #11514
Related: NVIDIA/spark-rapids#5561
This PR implements a pair of methods for exporting and importing the CUDA IPC memory handle. As described in the issue, this feature is mostly for pyspark where data needs to be copied between two processes with different language envs.
This is still a working-in-progress PR seeking comments from maintainers. Please ignore unrelated changes in the build script.
In the implementation, I called in CUDA driver API to workaround the issue of IPC memory handle mentioned in NVIDIA/spark-rapids#5561 (comment) . For a brief summary of the issue with RMM, CUDA IPC functions return the same IPC handle for the memory region with the same base pointer(the pointer returned by
cudaMalloc
). For instance,ptr
andptr + 4
have the same IPC memory handle, which breaks the pool allocator. In the PR, I calculate the offset upon creating IPC handle and export it as part of the IPC message. Lastly, the IPC message in the current implementation is simply a binary buffer instead of a more complicated JSON document.We discussed the possibility of using Arrow IPC. It doesn't work for our purpose for a couple of reasons. Firstly arrow doesn't handle the pointer offset described above. Secondly, this PR ensures that there's no data copy inside c++, which can not be easily done with arrow structures like record batch.
Supported features
to-do