Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions native-engine/datafusion-ext-plans/src/common/execution_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,89 @@ impl ExecutionContext {
.counter(name.to_owned(), self.partition_id)
}

pub fn split_with_default_batch_size(
self: &Arc<Self>,
input: SendableRecordBatchStream,
) -> SendableRecordBatchStream {
Comment on lines +144 to +147
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The split_with_default_batch_size method lacks documentation. Consider adding a doc comment that explains:

  • The purpose of this method (splitting large batches into smaller ones)
  • The behavior when batches are already smaller than the target size
  • The target batch size used (from batch_size())
  • The relationship with coalesce_with_default_batch_size

Example:

/// Splits large record batches into smaller batches with sizes not exceeding
/// the default batch size. Batches smaller than or equal to the target size
/// are passed through unchanged. Empty batches are filtered out.
///
/// This is typically used in combination with `coalesce_with_default_batch_size`
/// to normalize batch sizes in a stream.

Copilot uses AI. Check for mistakes.
struct SplitLargeBatchStream {
input: SendableRecordBatchStream,
current_batch: Option<RecordBatch>,
current_offset: usize,
}

impl SplitLargeBatchStream {
fn split_next_chunk(&mut self) -> Option<RecordBatch> {
let batch = self.current_batch.as_ref()?;
let target_batch_size = batch_size();
let num_rows = batch.num_rows();

if self.current_offset >= num_rows {
self.current_batch = None;
return None;
}

let chunk_size = std::cmp::min(target_batch_size, num_rows - self.current_offset);
let chunk = batch.slice(self.current_offset, chunk_size);
self.current_offset += chunk_size;

if self.current_offset >= num_rows {
self.current_batch = None;
}

Some(chunk)
}
}

impl RecordBatchStream for SplitLargeBatchStream {
fn schema(&self) -> SchemaRef {
self.input.schema()
}
}

impl Stream for SplitLargeBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
if let Some(chunk) = self.split_next_chunk() {
return Poll::Ready(Some(Ok(chunk)));
}

match ready!(self.input.as_mut().poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if batch.is_empty() {
continue;
}

let target_batch_size = batch_size();
if target_batch_size == 0 {
return Poll::Ready(Some(Err(DataFusionError::Internal(
"Invalid batch size: 0".to_string(),
))));
}

let num_rows = batch.num_rows();
if num_rows <= target_batch_size {
return Poll::Ready(Some(Ok(batch)));
} else {
self.current_batch = Some(batch);
self.current_offset = 0;
}
}
Some(Err(e)) => return Poll::Ready(Some(Err(e))),
None => return Poll::Ready(None),
}
}
}
}

Box::pin(SplitLargeBatchStream {
input,
current_batch: None,
current_offset: 0,
})
}
Comment on lines +144 to +225
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new split_with_default_batch_size method lacks test coverage. Consider adding unit tests to verify:

  • Splitting large batches (e.g., 10000 rows) into multiple smaller batches
  • Passing through batches already at or below target size
  • Handling empty batches
  • Error handling for zero batch size
  • Edge cases like batches with exactly target_batch_size rows

This is important to ensure the splitting logic works correctly, especially for the error case at line 199-202.

Copilot uses AI. Check for mistakes.

pub fn coalesce_with_default_batch_size(
self: &Arc<Self>,
input: SendableRecordBatchStream,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ impl SortMergeJoinExec {
.sub_duration(poll_time.duration());
})
});
Ok(exec_ctx.coalesce_with_default_batch_size(output))
Ok(exec_ctx
.coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output)))
Comment on lines +208 to +209
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The composition coalesce_with_default_batch_size(split_with_default_batch_size(output)) may create unnecessary overhead. After splitting large batches to the target size, the coalesce operation will immediately try to merge them back if they're too small (less than 1/4 of batch_size based on line 275 of execution_context.rs).

Consider whether both operations are needed here, or if just split_with_default_batch_size would suffice for the sort merge join output. The coalesce operation is typically used for small batches from sources like filters or unions, but after splitting, batches should already be close to the target size.

Suggested change
Ok(exec_ctx
.coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output)))
Ok(exec_ctx.split_with_default_batch_size(output))

Copilot uses AI. Check for mistakes.
}
}

Expand Down
Loading