-
Notifications
You must be signed in to change notification settings - Fork 199
[AURON #1693] split one large batch to many small batch after sort merge join #1694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,6 +141,89 @@ impl ExecutionContext { | |
| .counter(name.to_owned(), self.partition_id) | ||
| } | ||
|
|
||
| pub fn split_with_default_batch_size( | ||
| self: &Arc<Self>, | ||
| input: SendableRecordBatchStream, | ||
| ) -> SendableRecordBatchStream { | ||
| struct SplitLargeBatchStream { | ||
| input: SendableRecordBatchStream, | ||
| current_batch: Option<RecordBatch>, | ||
| current_offset: usize, | ||
| } | ||
|
|
||
| impl SplitLargeBatchStream { | ||
| fn split_next_chunk(&mut self) -> Option<RecordBatch> { | ||
| let batch = self.current_batch.as_ref()?; | ||
| let target_batch_size = batch_size(); | ||
| let num_rows = batch.num_rows(); | ||
|
|
||
| if self.current_offset >= num_rows { | ||
| self.current_batch = None; | ||
| return None; | ||
| } | ||
|
|
||
| let chunk_size = std::cmp::min(target_batch_size, num_rows - self.current_offset); | ||
| let chunk = batch.slice(self.current_offset, chunk_size); | ||
| self.current_offset += chunk_size; | ||
|
|
||
| if self.current_offset >= num_rows { | ||
| self.current_batch = None; | ||
| } | ||
|
|
||
| Some(chunk) | ||
| } | ||
| } | ||
|
|
||
| impl RecordBatchStream for SplitLargeBatchStream { | ||
| fn schema(&self) -> SchemaRef { | ||
| self.input.schema() | ||
| } | ||
| } | ||
|
|
||
| impl Stream for SplitLargeBatchStream { | ||
| type Item = Result<RecordBatch>; | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||
| loop { | ||
| if let Some(chunk) = self.split_next_chunk() { | ||
| return Poll::Ready(Some(Ok(chunk))); | ||
| } | ||
|
|
||
| match ready!(self.input.as_mut().poll_next_unpin(cx)) { | ||
| Some(Ok(batch)) => { | ||
| if batch.is_empty() { | ||
| continue; | ||
| } | ||
|
|
||
| let target_batch_size = batch_size(); | ||
| if target_batch_size == 0 { | ||
| return Poll::Ready(Some(Err(DataFusionError::Internal( | ||
| "Invalid batch size: 0".to_string(), | ||
| )))); | ||
| } | ||
|
|
||
| let num_rows = batch.num_rows(); | ||
| if num_rows <= target_batch_size { | ||
| return Poll::Ready(Some(Ok(batch))); | ||
| } else { | ||
| self.current_batch = Some(batch); | ||
| self.current_offset = 0; | ||
| } | ||
| } | ||
| Some(Err(e)) => return Poll::Ready(Some(Err(e))), | ||
| None => return Poll::Ready(None), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Box::pin(SplitLargeBatchStream { | ||
| input, | ||
| current_batch: None, | ||
| current_offset: 0, | ||
| }) | ||
| } | ||
|
Comment on lines
+144
to
+225
|
||
|
|
||
| pub fn coalesce_with_default_batch_size( | ||
| self: &Arc<Self>, | ||
| input: SendableRecordBatchStream, | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -205,7 +205,8 @@ impl SortMergeJoinExec { | |||||||
| .sub_duration(poll_time.duration()); | ||||||||
| }) | ||||||||
| }); | ||||||||
| Ok(exec_ctx.coalesce_with_default_batch_size(output)) | ||||||||
| Ok(exec_ctx | ||||||||
| .coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output))) | ||||||||
|
Comment on lines
+208
to
+209
|
||||||||
| Ok(exec_ctx | |
| .coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output))) | |
| Ok(exec_ctx.split_with_default_batch_size(output)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
split_with_default_batch_sizemethod lacks documentation. Consider adding a doc comment that explains:batch_size())coalesce_with_default_batch_sizeExample: