From caa10406c4edf2b3afe47de293811cff32db68da Mon Sep 17 00:00:00 2001 From: xorsum Date: Fri, 5 Dec 2025 14:58:43 +0800 Subject: [PATCH 1/7] join operation should flush in time on duplicated keys --- .../src/joins/smj/full_join.rs | 63 +++++-------------- 1 file changed, 17 insertions(+), 46 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 3f2e06773..2c8ba2812 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -131,66 +131,37 @@ impl Joiner for FullJoiner 1 { - self.as_mut().flush(cur1, cur2).await?; - cur2.clean_out_dated_batches(); - } - } - } - - if l_equal { - // stream left side - while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) { - for &ridx in &equal_rindices { - self.lindices.push(cur1.cur_idx()); - self.rindices.push(ridx); - } - cur_forward!(cur1); - if self.should_flush() || cur1.num_buffered_batches() > 1 { + for &lidx in &equal_lindices { + for &ridx in &equal_rindices { + self.lindices.push(lidx); + self.rindices.push(ridx); + if self.should_flush() { self.as_mut().flush(cur1, cur2).await?; - cur1.clean_out_dated_batches(); } } } From d1f5e9dd2235ba66bb149ddad79ba37cb2b13c9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Baokun=20Han=20=EF=BC=88=E9=9F=A9=E5=AE=9D=E5=9D=A4?= =?UTF-8?q?=EF=BC=89?= Date: Sat, 6 Dec 2025 13:00:52 +0800 Subject: [PATCH 2/7] Revert "join operation should flush in time on duplicated keys" This reverts commit caa10406c4edf2b3afe47de293811cff32db68da. --- .../src/joins/smj/full_join.rs | 63 ++++++++++++++----- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 2c8ba2812..3f2e06773 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -131,37 +131,66 @@ impl Joiner for FullJoiner 1 { + self.as_mut().flush(cur1, cur2).await?; + cur2.clean_out_dated_batches(); + } + } + } + + if l_equal { + // stream left side + while !cur1.finished() && cur1.cur_key() == cur2.key(r_key_idx) { + for &ridx in &equal_rindices { + self.lindices.push(cur1.cur_idx()); + self.rindices.push(ridx); + } + cur_forward!(cur1); + if self.should_flush() || cur1.num_buffered_batches() > 1 { self.as_mut().flush(cur1, cur2).await?; + cur1.clean_out_dated_batches(); } } } From 74292109a44d9470092ae6a5e9180b0551774e32 Mon Sep 17 00:00:00 2001 From: xorsum Date: Sat, 6 Dec 2025 13:12:11 +0800 Subject: [PATCH 3/7] just split the loop --- .../datafusion-ext-plans/src/joins/smj/full_join.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 3f2e06773..5246f697f 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -160,9 +160,14 @@ impl Joiner for FullJoiner Date: Sun, 7 Dec 2025 10:56:27 +0800 Subject: [PATCH 4/7] add has_enough_room and unit test --- .../src/joins/smj/full_join.rs | 22 ++- .../datafusion-ext-plans/src/joins/test.rs | 160 +++++++++++++++++- 2 files changed, 177 insertions(+), 5 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 5246f697f..2ac71a125 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -56,6 +56,10 @@ impl FullJoiner { self.lindices.len() >= self.join_params.batch_size } + fn has_enough_room(&self, new_size: usize) -> bool { + self.lindices.len() + new_size < self.join_params.batch_size + } + async fn flush( mut self: Pin<&mut Self>, cur1: &mut StreamCursor, @@ -160,12 +164,22 @@ impl Joiner for FullJoiner, + right: Arc, + on: JoinOn, + join_type: JoinType, + batch_size: usize + ) -> Result<(Vec, Vec)> { + MemManager::init(1000000); + let session_config = SessionConfig::new().with_batch_size(batch_size); + let session_ctx = SessionContext::new_with_config(session_config); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let schema = build_join_schema_for_test(&left.schema(), &right.schema(), join_type)?; + + let join: Arc = match test_type { + SMJ => { + let sort_options = vec![SortOptions::default(); on.len()]; + Arc::new(SortMergeJoinExec::try_new( + schema, + left, + right, + on, + join_type, + sort_options, + )?) + } + BHJLeftProbed => { + let right = Arc::new(BroadcastJoinBuildHashMapExec::new( + right, + on.iter().map(|(_, right_key)| right_key.clone()).collect(), + )); + Arc::new(BroadcastJoinExec::try_new( + schema, + left, + right, + on, + join_type, + JoinSide::Right, + true, + None, + )?) + } + BHJRightProbed => { + let left = Arc::new(BroadcastJoinBuildHashMapExec::new( + left, + on.iter().map(|(left_key, _)| left_key.clone()).collect(), + )); + Arc::new(BroadcastJoinExec::try_new( + schema, + left, + right, + on, + join_type, + JoinSide::Left, + true, + None, + )?) + } + SHJLeftProbed => Arc::new(BroadcastJoinExec::try_new( + schema, + left, + right, + on, + join_type, + JoinSide::Right, + false, + None, + )?), + SHJRightProbed => Arc::new(BroadcastJoinExec::try_new( + schema, + left, + right, + on, + join_type, + JoinSide::Left, + false, + None, + )?), + }; + let columns = columns(&join.schema()); + let stream = join.execute(0, task_ctx)?; + let batches = common::collect(stream).await?; + Ok((columns, batches)) + } + const ALL_TEST_TYPE: [TestType; 5] = [ SMJ, BHJLeftProbed, @@ -428,6 +514,78 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn join_inner_batchsize() -> Result<()> { + for test_type in ALL_TEST_TYPE { + let left = build_table( + ("a1", &vec![1, 1, 1, 1, 1]), + ("b1", &vec![1, 2, 3, 4, 5]), + ("c1", &vec![1, 2, 3, 4, 5]), + ); + let right = build_table( + ("a2", &vec![1, 1, 1, 1, 1, 1, 1]), + ("b2", &vec![1, 2, 3, 4, 5, 6, 7]), + ("c2", &vec![1, 2, 3, 4, 5, 6, 7]), + ); + let on: JoinOn = vec![( + Arc::new(Column::new_with_schema("a1", &left.schema())?), + Arc::new(Column::new_with_schema("a2", &right.schema())?), + )]; + let expected = vec![ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b2 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | 1 | 1 | 1 | 1 | 1 |", + "| 1 | 1 | 1 | 1 | 2 | 2 |", + "| 1 | 1 | 1 | 1 | 3 | 3 |", + "| 1 | 1 | 1 | 1 | 4 | 4 |", + "| 1 | 1 | 1 | 1 | 5 | 5 |", + "| 1 | 1 | 1 | 1 | 6 | 6 |", + "| 1 | 1 | 1 | 1 | 7 | 7 |", + "| 1 | 2 | 2 | 1 | 1 | 1 |", + "| 1 | 2 | 2 | 1 | 2 | 2 |", + "| 1 | 2 | 2 | 1 | 3 | 3 |", + "| 1 | 2 | 2 | 1 | 4 | 4 |", + "| 1 | 2 | 2 | 1 | 5 | 5 |", + "| 1 | 2 | 2 | 1 | 6 | 6 |", + "| 1 | 2 | 2 | 1 | 7 | 7 |", + "| 1 | 3 | 3 | 1 | 1 | 1 |", + "| 1 | 3 | 3 | 1 | 2 | 2 |", + "| 1 | 3 | 3 | 1 | 3 | 3 |", + "| 1 | 3 | 3 | 1 | 4 | 4 |", + "| 1 | 3 | 3 | 1 | 5 | 5 |", + "| 1 | 3 | 3 | 1 | 6 | 6 |", + "| 1 | 3 | 3 | 1 | 7 | 7 |", + "| 1 | 4 | 4 | 1 | 1 | 1 |", + "| 1 | 4 | 4 | 1 | 2 | 2 |", + "| 1 | 4 | 4 | 1 | 3 | 3 |", + "| 1 | 4 | 4 | 1 | 4 | 4 |", + "| 1 | 4 | 4 | 1 | 5 | 5 |", + "| 1 | 4 | 4 | 1 | 6 | 6 |", + "| 1 | 4 | 4 | 1 | 7 | 7 |", + "| 1 | 5 | 5 | 1 | 1 | 1 |", + "| 1 | 5 | 5 | 1 | 2 | 2 |", + "| 1 | 5 | 5 | 1 | 3 | 3 |", + "| 1 | 5 | 5 | 1 | 4 | 4 |", + "| 1 | 5 | 5 | 1 | 5 | 5 |", + "| 1 | 5 | 5 | 1 | 6 | 6 |", + "| 1 | 5 | 5 | 1 | 7 | 7 |", + "+----+----+----+----+----+----+", + ]; + let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 2).await?; + assert_batches_sorted_eq!(expected, &batches); + let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 3).await?; + assert_batches_sorted_eq!(expected, &batches); + let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 4).await?; + assert_batches_sorted_eq!(expected, &batches); + let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 5).await?; + assert_batches_sorted_eq!(expected, &batches); + let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 7).await?; + assert_batches_sorted_eq!(expected, &batches); + } + Ok(()) + } + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn join_left_one() -> Result<()> { for test_type in ALL_TEST_TYPE { From d51385d7d8723b168ec8367e0311f75882922706 Mon Sep 17 00:00:00 2001 From: xorsum Date: Sun, 7 Dec 2025 13:51:46 +0800 Subject: [PATCH 5/7] format --- .../src/joins/smj/full_join.rs | 4 +- .../datafusion-ext-plans/src/joins/test.rs | 56 ++++++++++++++++--- 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 2ac71a125..1a9940be4 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -167,7 +167,9 @@ impl Joiner for FullJoiner, on: JoinOn, join_type: JoinType, - batch_size: usize + batch_size: usize, ) -> Result<(Vec, Vec)> { MemManager::init(1000000); let session_config = SessionConfig::new().with_batch_size(batch_size); @@ -572,15 +572,55 @@ mod tests { "| 1 | 5 | 5 | 1 | 7 | 7 |", "+----+----+----+----+----+----+", ]; - let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 2).await?; + let (_, batches) = join_collect_with_batch_size( + test_type, + left.clone(), + right.clone(), + on.clone(), + Inner, + 2, + ) + .await?; assert_batches_sorted_eq!(expected, &batches); - let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 3).await?; + let (_, batches) = join_collect_with_batch_size( + test_type, + left.clone(), + right.clone(), + on.clone(), + Inner, + 3, + ) + .await?; assert_batches_sorted_eq!(expected, &batches); - let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 4).await?; + let (_, batches) = join_collect_with_batch_size( + test_type, + left.clone(), + right.clone(), + on.clone(), + Inner, + 4, + ) + .await?; assert_batches_sorted_eq!(expected, &batches); - let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 5).await?; + let (_, batches) = join_collect_with_batch_size( + test_type, + left.clone(), + right.clone(), + on.clone(), + Inner, + 5, + ) + .await?; assert_batches_sorted_eq!(expected, &batches); - let (_, batches) = join_collect_with_batch_size(test_type, left.clone(), right.clone(), on.clone(), Inner, 7).await?; + let (_, batches) = join_collect_with_batch_size( + test_type, + left.clone(), + right.clone(), + on.clone(), + Inner, + 7, + ) + .await?; assert_batches_sorted_eq!(expected, &batches); } Ok(()) From 3c8a77ad9b2118fb52e260458ffaf87c9d5a77b9 Mon Sep 17 00:00:00 2001 From: bkhan Date: Mon, 8 Dec 2025 11:20:45 +0800 Subject: [PATCH 6/7] Update native-engine/datafusion-ext-plans/src/joins/test.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- native-engine/datafusion-ext-plans/src/joins/test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/test.rs b/native-engine/datafusion-ext-plans/src/joins/test.rs index d3d559dc7..8d34cdfc3 100644 --- a/native-engine/datafusion-ext-plans/src/joins/test.rs +++ b/native-engine/datafusion-ext-plans/src/joins/test.rs @@ -275,7 +275,6 @@ mod tests { MemManager::init(1000000); let session_config = SessionConfig::new().with_batch_size(batch_size); let session_ctx = SessionContext::new_with_config(session_config); - let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let schema = build_join_schema_for_test(&left.schema(), &right.schema(), join_type)?; From 44ab9e92e3ba4692c092b1e231b1918920cfc4a1 Mon Sep 17 00:00:00 2001 From: bkhan Date: Mon, 8 Dec 2025 11:20:54 +0800 Subject: [PATCH 7/7] Update native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs index 1a9940be4..ecabfaed9 100644 --- a/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs +++ b/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs @@ -57,7 +57,7 @@ impl FullJoiner { } fn has_enough_room(&self, new_size: usize) -> bool { - self.lindices.len() + new_size < self.join_params.batch_size + self.lindices.len() + new_size <= self.join_params.batch_size } async fn flush(