-
Notifications
You must be signed in to change notification settings - Fork 972
[QDP] Double-buffered pinned I/O pipeline and faster Parquet decode #751
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev-qdp
Are you sure you want to change the base?
Conversation
|
Thanks @400Ping for the patch!
|
My bad just fixed it. |
63ab994 to
755140f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@400Ping thanks for the patch!
left some comments
|
I think maybe we could add some unit tests for this. |
|
We have 2 improvement in this PR. Based on the benchmark result, I'm speculating if there's one of them are not contributing to the speed improvement. What's your experience? |
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
This reverts commit 3556b5a.
Signed-off-by: 400Ping <fourhundredping@gmail.com>
372a6c5 to
b411dcf
Compare
I think both have improvements, for the second one is what @rich7420 and @guan404ming suggested to change a different decompression technique to improve its performance. But I think overall it is because of the first one improving the speed improvements. |
Signed-off-by: 400Ping <fourhundredping@gmail.com>
|
Just tested, the second one doesn't improve much performance, going to remove it. |
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
|
plz fix pre-commit error |
|
Done, @rich7420 PTAL |
Signed-off-by: 400Ping <fourhundredping@gmail.com>
|
please fix pre-commit. I tested locally and get a 2.8% speedup on arrow ipc case. |
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a double-buffered pinned host memory I/O pipeline to improve GPU data transfer performance. The key optimization is adding a reusable pool of pinned host buffers to eliminate repeated CUDA allocation/deallocation overhead in the streaming Parquet decode path.
- Implements
PinnedBufferPoolwith automatic RAII-based buffer management - Refactors
PipelineContextto support multiple event slots for double-buffered synchronization - Renames
PinnedBuffertoPinnedHostBufferfor clarity - Moves norm buffer allocation from per-pipeline to per-chunk
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| qdp/qdp-core/src/gpu/buffer_pool.rs | New pinned host buffer pool with acquire/release semantics and automatic return-to-pool on drop |
| qdp/qdp-core/src/gpu/pipeline.rs | Extended PipelineContext to support multiple event slots; integrated pinned buffer pool; improved error handling with Result returns |
| qdp/qdp-core/src/gpu/memory.rs | Renamed PinnedBuffer to PinnedHostBuffer and added immutable slice accessor |
| qdp/qdp-core/src/lib.rs | Integrated buffer pool types; moved norm buffer allocation to per-chunk scope; updated pipeline event handling |
| qdp/qdp-core/src/gpu/mod.rs | Exposed new buffer_pool module and its public types |
| qdp/qdp-core/src/gpu/cuda_ffi.rs | Removed redundant cfg attribute (already applied at module level) |
| qdp/qdp-kernels/tests/amplitude_encode.rs | Refactored test loop to use idiomatic iterator pattern instead of direct indexing |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
qdp/qdp-core/src/gpu/buffer_pool.rs
Outdated
| let mut free = self.free.lock().unwrap(); | ||
| loop { | ||
| if let Some(buffer) = free.pop() { | ||
| return PinnedBufferHandle { | ||
| buffer: Some(buffer), | ||
| pool: Arc::clone(self), | ||
| }; | ||
| } | ||
| free = self.available_cv.wait(free).unwrap(); |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .unwrap() calls on mutex lock operations can cause panics if the mutex is poisoned. In a production system, poisoned mutex errors should be handled more gracefully, either by propagating the error or by documenting that panic behavior is intentional in these scenarios.
qdp/qdp-core/src/gpu/buffer_pool.rs
Outdated
| /// Returns `None` if the pool is currently empty; callers can choose to spin/wait | ||
| /// or fall back to synchronous paths. | ||
| pub fn try_acquire(self: &Arc<Self>) -> Option<PinnedBufferHandle> { | ||
| let mut free = self.free.lock().unwrap(); |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .unwrap() call on the mutex lock operation can cause a panic if the mutex is poisoned. Consider handling this error more gracefully or documenting the panic behavior.
qdp/qdp-core/src/gpu/buffer_pool.rs
Outdated
|
|
||
| /// Number of buffers currently available. | ||
| pub fn available(&self) -> usize { | ||
| self.free.lock().unwrap().len() |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .unwrap() call on the mutex lock operation can cause a panic if the mutex is poisoned. Consider handling this error more gracefully or documenting the panic behavior.
| if chunk.len() > CHUNK_SIZE_ELEMENTS { | ||
| return Err(MahoutError::InvalidInput(format!( | ||
| "Chunk size {} exceeds pinned buffer capacity {}", | ||
| chunk.len(), | ||
| CHUNK_SIZE_ELEMENTS | ||
| ))); | ||
| } | ||
|
|
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is redundant because the iterator chunks() will never produce a chunk larger than CHUNK_SIZE_ELEMENTS. The check can be removed to simplify the code.
| if chunk.len() > CHUNK_SIZE_ELEMENTS { | |
| return Err(MahoutError::InvalidInput(format!( | |
| "Chunk size {} exceeds pinned buffer capacity {}", | |
| chunk.len(), | |
| CHUNK_SIZE_ELEMENTS | |
| ))); | |
| } |
| let mut norm_buffer = self | ||
| .device | ||
| .alloc_zeros::<f64>(samples_in_chunk) | ||
| .map_err(|e| { | ||
| MahoutError::MemoryAllocation(format!( | ||
| "Failed to allocate norm buffer: {:?}", | ||
| e | ||
| )) | ||
| })?; |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving the norm buffer allocation inside the per-chunk loop (line 331-339) reintroduces per-chunk allocation overhead that was previously avoided. The old code comment explicitly stated: "Reuse a single norm buffer across chunks to avoid per-chunk allocations" and warned that "per-chunk allocation + drop can lead to use-after-free when the next chunk reuses the same device memory while the previous chunk is still running." While the use-after-free concern is mitigated by proper stream synchronization, the per-chunk allocation overhead remains. Consider pre-allocating a single norm buffer sized for the maximum expected samples_in_chunk to improve performance.
qdp/qdp-core/src/gpu/buffer_pool.rs
Outdated
| impl Drop for PinnedBufferHandle { | ||
| fn drop(&mut self) { | ||
| if let Some(buf) = self.buffer.take() { | ||
| let mut free = self.pool.free.lock().unwrap(); |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The .unwrap() call on the mutex lock operation can cause a panic if the mutex is poisoned. Consider handling this error more gracefully or documenting the panic behavior.
| let mut free = self.pool.free.lock().unwrap(); | |
| let mut free = match self.pool.free.lock() { | |
| Ok(guard) => guard, | |
| Err(poisoned) => poisoned.into_inner(), | |
| }; |
| fn deref(&self) -> &Self::Target { | ||
| self.buffer | ||
| .as_ref() | ||
| .expect("Buffer already returned to pool") | ||
| } | ||
| } | ||
|
|
||
| #[cfg(target_os = "linux")] | ||
| impl std::ops::DerefMut for PinnedBufferHandle { | ||
| fn deref_mut(&mut self) -> &mut Self::Target { | ||
| self.buffer | ||
| .as_mut() | ||
| .expect("Buffer already returned to pool") |
Copilot
AI
Jan 1, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The panic message "Buffer already returned to pool" may be misleading. This panic occurs when attempting to use a PinnedBufferHandle after it has been dropped and its buffer returned to the pool. Consider a more descriptive message such as "Attempted to use PinnedBufferHandle after buffer was returned to pool (use-after-drop)" to better indicate the programmer error.
|
I agree that the comment regarding |
I think I will change the code to handle it more gracefully and add some comments to document this behavior. |
Signed-off-by: 400Ping <fourhundredping@gmail.com>
Signed-off-by: 400Ping <fourhundredping@gmail.com>
|
Need resolve conflicts, and overall looks good to me! |
ryankert01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lg, some suggestions
Purpose of PR
Related Issues or PRs
Closes #703
Changes Made
Breaking Changes
Checklist