From b53e161921a4b30d1ad45e8b221092e274fcb040 Mon Sep 17 00:00:00 2001 From: dentiny Date: Fri, 13 Jun 2025 15:30:15 -0700 Subject: [PATCH 1/2] [storage] Simplify cache output state and consider insufficient disk space (#458) * simplify cache out state * test for new state --- .../cache/object_storage/base_cache.rs | 12 +- .../cache/object_storage/cache_handle.rs | 22 --- .../object_storage/object_storage_cache.rs | 119 ++++++++----- .../cache/object_storage/state_tests.rs | 158 ++++++++---------- .../cache/object_storage/test_utils.rs | 32 +--- 5 files changed, 154 insertions(+), 189 deletions(-) diff --git a/src/moonlink/src/storage/cache/object_storage/base_cache.rs b/src/moonlink/src/storage/cache/object_storage/base_cache.rs index 257a04dcf..474996a7b 100644 --- a/src/moonlink/src/storage/cache/object_storage/base_cache.rs +++ b/src/moonlink/src/storage/cache/object_storage/base_cache.rs @@ -5,7 +5,7 @@ /// - evictable if a cache entry is unreferenced use async_trait::async_trait; -use crate::storage::cache::object_storage::cache_handle::DataCacheHandle; +use crate::storage::cache::object_storage::cache_handle::NonEvictableHandle; use crate::storage::storage_utils::FileId; use crate::Result; @@ -37,21 +37,25 @@ pub struct CacheEntry { #[allow(dead_code)] #[async_trait] pub trait CacheTrait { - /// Import cache entry to the cache. + /// Import cache entry to the cache. If there's no enough disk space, panic directly. /// Precondition: the file is not managed by cache. #[allow(async_fn_in_trait)] async fn _import_cache_entry( &mut self, file_id: FileId, cache_entry: CacheEntry, - ) -> (DataCacheHandle, Vec); + ) -> (NonEvictableHandle, Vec); /// Get file entry. /// If the requested file entry doesn't exist in cache, an IO operation is performed. + /// If there's no sufficient disk space, return [`None`]. #[allow(async_fn_in_trait)] async fn _get_cache_entry( &mut self, file_id: FileId, remote_filepath: &str, - ) -> Result<(DataCacheHandle, Vec)>; + ) -> Result<( + Option, + Vec, /*files_to_delete*/ + )>; } diff --git a/src/moonlink/src/storage/cache/object_storage/cache_handle.rs b/src/moonlink/src/storage/cache/object_storage/cache_handle.rs index f9c38b3a3..849bf784e 100644 --- a/src/moonlink/src/storage/cache/object_storage/cache_handle.rs +++ b/src/moonlink/src/storage/cache/object_storage/cache_handle.rs @@ -44,25 +44,3 @@ impl NonEvictableHandle { guard._unreference(self.file_id); } } - -/// A unified handle for data file cache entries, which represents different states for a data file cache resource. -#[allow(dead_code)] -#[derive(Debug)] -pub enum DataCacheHandle { - /// Cache file is managed by data file already and at evictable state; should pin before use. - Evictable, - /// Cache file is managed by data file already and pinned, could use at any time. - NonEvictable(NonEvictableHandle), -} - -impl DataCacheHandle { - /// Unreferenced the pinned cache file. - pub async fn _unreference(&mut self) { - match self { - DataCacheHandle::NonEvictable(handle) => { - handle._unreference().await; - } - _ => panic!("Cannot unreference for an unpinned cache handle"), - } - } -} diff --git a/src/moonlink/src/storage/cache/object_storage/object_storage_cache.rs b/src/moonlink/src/storage/cache/object_storage/object_storage_cache.rs index 0863da6a2..8f1056cc2 100644 --- a/src/moonlink/src/storage/cache/object_storage/object_storage_cache.rs +++ b/src/moonlink/src/storage/cache/object_storage/object_storage_cache.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use crate::storage::cache::object_storage::base_cache::{ CacheEntry, CacheTrait, FileMetadata, ObjectStorageCacheConfig, }; -use crate::storage::cache::object_storage::cache_handle::{DataCacheHandle, NonEvictableHandle}; +use crate::storage::cache::object_storage::cache_handle::NonEvictableHandle; use crate::storage::storage_utils::FileId; use crate::Result; @@ -39,15 +39,28 @@ pub(crate) struct ObjectStorageCacheInternal { impl ObjectStorageCacheInternal { /// Util function to remove entries from evictable cache, until overall file size drops down below max size. - /// Return data files which get evicted from LRU cache, and will be deleted locally. - fn _evict_cache_entries(&mut self, max_bytes: u64) -> Vec { + /// + /// # Arguments + /// + /// * tolerate_insufficiency: if true, tolerate disk space insufficiency by returning `false` in such case; otherwise panic if nothing to evict when insufficient disk space. + /// + /// Return + /// - whether cache entries eviction succeeds or not. + /// - data files which get evicted from LRU cache, and will be deleted locally. + fn _evict_cache_entries( + &mut self, + max_bytes: u64, + tolerate_insufficiency: bool, + ) -> (bool, Vec) { let mut evicted_data_files = vec![]; while self.cur_bytes > max_bytes { - // TODO(hjiang): In certain case, we could tolerate disk space insufficiency (i.e. when pg_mooncake request for data files, we could fallback to return remote files). - assert!( - !self.evictable_cache.is_empty(), - "Cannot reduce disk usage by evicting entries." - ); + if self.evictable_cache.is_empty() { + assert!( + tolerate_insufficiency, + "Cannot reduce disk usage by evicting entries." + ); + return (false, evicted_data_files); + } let (_, mut cache_entry_wrapper) = self.evictable_cache.pop_lru().unwrap(); assert_eq!(cache_entry_wrapper.reference_count, 0); self.cur_bytes -= cache_entry_wrapper.cache_entry.file_metadata.file_size; @@ -55,23 +68,36 @@ impl ObjectStorageCacheInternal { std::mem::take(&mut cache_entry_wrapper.cache_entry.cache_filepath); evicted_data_files.push(cache_filepath); } - evicted_data_files + (true, evicted_data_files) } /// Util function to insert into non-evictable cache. - /// NOTICE: cache current bytes won't be updated. + /// + /// Return + /// - whether cache entries eviction succeeds or not. + /// - data files which get evicted from LRU cache, and will be deleted locally. + /// + /// NOTICE: + /// - cache current bytes won't be updated. + /// - If insertion fails due to insufficiency, the input cache entry won't be inserted into cache. fn _insert_non_evictable( &mut self, file_id: FileId, cache_entry_wrapper: CacheEntryWrapper, max_bytes: u64, - ) -> Vec { + tolerate_insufficiency: bool, + ) -> (bool, Vec) { assert!(self.evictable_cache.get(&file_id).is_none()); - let old_entry = self + assert!(self .non_evictable_cache - .insert(file_id, cache_entry_wrapper); - assert!(old_entry.is_none()); - self._evict_cache_entries(max_bytes) + .insert(file_id, cache_entry_wrapper) + .is_none()); + let (evict_succ, files_to_delete) = + self._evict_cache_entries(max_bytes, tolerate_insufficiency); + if !evict_succ { + assert!(self.non_evictable_cache.remove(&file_id).is_some()); + } + (evict_succ, files_to_delete) } /// Unreference the given cache entry. @@ -139,7 +165,7 @@ impl CacheTrait for ObjectStorageCache { &mut self, file_id: FileId, cache_entry: CacheEntry, - ) -> (DataCacheHandle, Vec) { + ) -> (NonEvictableHandle, Vec) { let cache_entry_wrapper = CacheEntryWrapper { cache_entry: cache_entry.clone(), reference_count: 1, @@ -148,21 +174,27 @@ impl CacheTrait for ObjectStorageCache { let mut guard = self.cache.write().await; guard.cur_bytes += cache_entry.file_metadata.file_size; - let cache_files_to_delete = - guard._insert_non_evictable(file_id, cache_entry_wrapper, self.config.max_bytes); + let cache_files_to_delete = guard + ._insert_non_evictable( + file_id, + cache_entry_wrapper, + self.config.max_bytes, + /*tolerate_insufficiency=*/ false, + ) + .1; let non_evictable_handle = NonEvictableHandle::_new(file_id, cache_entry, self.cache.clone()); - ( - DataCacheHandle::NonEvictable(non_evictable_handle), - cache_files_to_delete, - ) + (non_evictable_handle, cache_files_to_delete) } async fn _get_cache_entry( &mut self, file_id: FileId, remote_filepath: &str, - ) -> Result<(DataCacheHandle, Vec)> { + ) -> Result<( + Option, + Vec, /*files_to_delete*/ + )> { { let mut guard = self.cache.write().await; @@ -174,7 +206,7 @@ impl CacheTrait for ObjectStorageCache { let cache_entry = value.as_ref().unwrap().cache_entry.clone(); let non_evictable_handle = NonEvictableHandle::_new(file_id, cache_entry, self.cache.clone()); - return Ok((DataCacheHandle::NonEvictable(non_evictable_handle), vec![])); + return Ok((Some(non_evictable_handle), /*files_to_delete=*/ vec![])); } // Check evictable cache. @@ -183,14 +215,18 @@ impl CacheTrait for ObjectStorageCache { assert_eq!(value.as_ref().unwrap().reference_count, 0); value.as_mut().unwrap().reference_count += 1; let cache_entry = value.as_ref().unwrap().cache_entry.clone(); - let files_to_delete = - guard._insert_non_evictable(file_id, value.unwrap(), self.config.max_bytes); + let files_to_delete = guard + ._insert_non_evictable( + file_id, + value.unwrap(), + self.config.max_bytes, + /*tolerate_insufficiency=*/ true, + ) + .1; + assert!(files_to_delete.is_empty()); let non_evictable_handle = NonEvictableHandle::_new(file_id, cache_entry, self.cache.clone()); - return Ok(( - DataCacheHandle::NonEvictable(non_evictable_handle), - files_to_delete, - )); + return Ok((Some(non_evictable_handle), /*files_to_delete=*/ vec![])); } } @@ -210,12 +246,16 @@ impl CacheTrait for ObjectStorageCache { { let mut guard = self.cache.write().await; guard.cur_bytes += cache_entry.file_metadata.file_size; - let evicted_entries = - guard._insert_non_evictable(file_id, cache_entry_wrapper, self.config.max_bytes); - Ok(( - DataCacheHandle::NonEvictable(non_evictable_handle), - evicted_entries, - )) + let (cache_succ, files_to_delete) = guard._insert_non_evictable( + file_id, + cache_entry_wrapper, + self.config.max_bytes, + /*tolerate_insufficiency=*/ true, + ); + if cache_succ { + return Ok((Some(non_evictable_handle), files_to_delete)); + } + Ok((None, files_to_delete)) } } } @@ -248,7 +288,7 @@ mod tests { let mut temp_cache = cache.clone(); let remote_file_directory = remote_file_directory.path().to_path_buf(); - let handle = tokio::task::spawn_blocking(async move || -> DataCacheHandle { + let handle = tokio::task::spawn_blocking(async move || -> NonEvictableHandle { let filename = format!("{}.parquet", idx); let test_file = create_test_file(remote_file_directory.as_path(), &filename).await; let data_file = create_data_file( @@ -260,15 +300,14 @@ mod tests { .await .unwrap(); assert!(cache_to_delete.is_empty()); - cache_handle + cache_handle.unwrap() }); handle_futures.push(handle); } let results = futures::future::join_all(handle_futures).await; for cur_handle_future in results.into_iter() { - let cur_cache_handle = cur_handle_future.unwrap().await; - let non_evictable_handle = get_non_evictable_cache_handle(&cur_cache_handle); + let non_evictable_handle = cur_handle_future.unwrap().await; check_file_content(&non_evictable_handle.cache_entry.cache_filepath).await; assert_eq!( non_evictable_handle.cache_entry.file_metadata.file_size as usize, diff --git a/src/moonlink/src/storage/cache/object_storage/state_tests.rs b/src/moonlink/src/storage/cache/object_storage/state_tests.rs index b0d123bcc..7ad2e00f2 100644 --- a/src/moonlink/src/storage/cache/object_storage/state_tests.rs +++ b/src/moonlink/src/storage/cache/object_storage/state_tests.rs @@ -11,8 +11,9 @@ /// /// State transfer to data file cache entries: /// (1) + create mooncake snapshot => (2) -/// (1) + requested to read => (3) -/// (2) + requested to read => (3) +/// (1) + requested to read + sufficient space => (3) +/// (2) + requested to read + sufficient space => (3) +/// (2) + requested to read + insufficient space => (2) /// (3) + requested to read => (3) /// (2) + new entry + sufficient space => (2) /// (2) + new entry + insufficient space => (1) @@ -20,11 +21,9 @@ /// (3) + query finishes + no reference count => (2) /// /// For more details, please refer to https://docs.google.com/document/d/1kwXIl4VPzhgzV4KP8yT42M35PfvMJW9PdjNTF7VNEfA/edit?usp=sharing -use crate::create_data_file; use crate::storage::cache::object_storage::base_cache::{ CacheEntry, CacheTrait, FileMetadata, ObjectStorageCacheConfig, }; -use crate::storage::cache::object_storage::cache_handle::DataCacheHandle; use crate::storage::cache::object_storage::object_storage_cache::ObjectStorageCache; use crate::storage::cache::object_storage::test_utils::*; use crate::storage::storage_utils::FileId; @@ -45,10 +44,9 @@ async fn test_cache_state_1_create_snashot() { let mut cache = get_test_object_storage_cache(&cache_file_directory); // Check cache handle status. - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(0), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -71,14 +69,13 @@ async fn test_cache_1_requested_to_read() { let mut cache = get_test_object_storage_cache(&cache_file_directory); // Check cache handle status. - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -92,9 +89,55 @@ async fn test_cache_1_requested_to_read() { assert_evictable_cache_size(&mut cache, /*expected_count=*/ 0).await; } -// (2) + requested to read => (3) +// (2) + requested to read + sufficient space => (3) #[tokio::test] -async fn test_cache_2_requested_to_read() { +async fn test_cache_2_requested_to_read_with_sufficient_space() { + let remote_file_directory = tempdir().unwrap(); + let cache_file_directory = tempdir().unwrap(); + let test_file_1 = create_test_file(remote_file_directory.path(), TEST_FILENAME_1).await; + let mut cache = ObjectStorageCache::_new(ObjectStorageCacheConfig { + max_bytes: CONTENT.len() as u64, + cache_directory: cache_file_directory.path().to_str().unwrap().to_string(), + }); + + // Import into cache first. + let cache_entry = CacheEntry { + cache_filepath: test_file_1.to_str().unwrap().to_string(), + file_metadata: FileMetadata { + file_size: CONTENT.len() as u64, + }, + }; + let (_, files_to_evict) = cache + ._import_cache_entry(/*file_id=*/ FileId(0), cache_entry) + .await; + assert_non_evictable_cache_handle_ref_count( + &mut cache, + /*file_id=*/ FileId(0), + /*expected_ref_count=*/ 1, + ) + .await; + assert!(files_to_evict.is_empty()); + + // Request to read, but failed to pin due to insufficient disk space. + let test_file_2 = create_test_file(remote_file_directory.path(), TEST_FILENAME_2).await; + let (cache_handle, files_to_evict) = cache + ._get_cache_entry( + /*file_id=*/ FileId(1), + test_file_2.as_path().to_str().unwrap(), + ) + .await + .unwrap(); + assert!(cache_handle.is_none()); + assert!(files_to_evict.is_empty()); + + // Check cache status. + assert_non_evictable_cache_size(&mut cache, /*expected_count=*/ 1).await; + assert_evictable_cache_size(&mut cache, /*expected_count=*/ 0).await; +} + +// (2) + requested to read + insufficient space => (2) +#[tokio::test] +async fn test_cache_2_requested_to_read_with_insufficient_space() { let remote_file_directory = tempdir().unwrap(); let cache_file_directory = tempdir().unwrap(); let test_file = create_test_file(remote_file_directory.path(), TEST_FILENAME_1).await; @@ -110,7 +153,6 @@ async fn test_cache_2_requested_to_read() { let (mut cache_handle, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(0), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -123,14 +165,13 @@ async fn test_cache_2_requested_to_read() { cache_handle._unreference().await; // Request to read, thus pinning the cache entry. - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -159,10 +200,9 @@ async fn test_cache_3_requested_to_read() { file_size: CONTENT.len() as u64, }, }; - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(0), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -172,14 +212,13 @@ async fn test_cache_3_requested_to_read() { assert!(files_to_evict.is_empty()); // Request to read, thus pinning the cache entry. - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -214,7 +253,6 @@ async fn test_cache_2_new_entry_with_sufficient_space() { let (mut cache_handle, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(0), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -234,10 +272,9 @@ async fn test_cache_2_new_entry_with_sufficient_space() { file_size: CONTENT.len() as u64, }, }; - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(1), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(1), @@ -272,7 +309,6 @@ async fn test_cache_2_new_entry_with_insufficient_space() { let (mut cache_handle_1, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(0), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle_1).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -292,20 +328,16 @@ async fn test_cache_2_new_entry_with_insufficient_space() { file_size: CONTENT.len() as u64, }, }; - let (cache_handle_2, files_to_evict) = cache + let (_, files_to_evict) = cache ._import_cache_entry(/*file_id=*/ FileId(1), cache_entry) .await; - assert_non_evictable_cache_handle(&cache_handle_2).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(1), /*expected_ref_count=*/ 1, ) .await; - let cache_file_1 = get_non_evictable_cache_handle(&cache_handle_1) - .cache_entry - .cache_filepath - .clone(); + let cache_file_1 = cache_handle_1.cache_entry.cache_filepath.clone(); assert_eq!(files_to_evict, vec![cache_file_1]); // Check cache status. @@ -322,14 +354,13 @@ async fn test_cache_3_unpin_still_referenced() { let mut cache = get_test_object_storage_cache(&cache_file_directory); // Check cache handle status. - let (cache_handle, files_to_evict) = cache + let (_, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -339,14 +370,13 @@ async fn test_cache_3_unpin_still_referenced() { assert!(files_to_evict.is_empty()); // Get the same cache entry again to increase its reference count. - let (mut cache_handle, files_to_evict) = cache + let (cache_handle, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -356,7 +386,7 @@ async fn test_cache_3_unpin_still_referenced() { assert!(files_to_evict.is_empty()); // Unreference one of the cache handles. - cache_handle._unreference().await; + cache_handle.unwrap()._unreference().await; // Check cache status. assert_non_evictable_cache_size(&mut cache, /*expected_count=*/ 1).await; @@ -372,14 +402,13 @@ async fn test_cache_3_unpin_not_referenced() { let mut cache = get_test_object_storage_cache(&cache_file_directory); // Check cache handle status. - let (mut cache_handle_1, files_to_evict) = cache + let (cache_handle_1, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle_1).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -389,14 +418,13 @@ async fn test_cache_3_unpin_not_referenced() { assert!(files_to_evict.is_empty()); // Get the same cache entry again to increase its reference count. - let (mut cache_handle_2, files_to_evict) = cache + let (cache_handle_2, files_to_evict) = cache ._get_cache_entry( /*file_id=*/ FileId(0), test_file.as_path().to_str().unwrap(), ) .await .unwrap(); - assert_non_evictable_cache_handle(&cache_handle_2).await; assert_non_evictable_cache_handle_ref_count( &mut cache, /*file_id=*/ FileId(0), @@ -406,64 +434,10 @@ async fn test_cache_3_unpin_not_referenced() { assert!(files_to_evict.is_empty()); // Unreference all cache handles. - cache_handle_1._unreference().await; - cache_handle_2._unreference().await; + cache_handle_1.unwrap()._unreference().await; + cache_handle_2.unwrap()._unreference().await; // Check cache status. assert_non_evictable_cache_size(&mut cache, /*expected_count=*/ 0).await; assert_evictable_cache_size(&mut cache, /*expected_count=*/ 1).await; } - -#[tokio::test] -async fn test_concurrent_data_file_cache() { - const PARALLEL_TASK_NUM: usize = 10; - let mut handle_futures = Vec::with_capacity(PARALLEL_TASK_NUM); - - let cache_file_directory = tempdir().unwrap(); - let remote_file_directory = tempdir().unwrap(); - - let config = ObjectStorageCacheConfig { - // Set max bytes larger than one file, but less than two files. - max_bytes: (CONTENT.len() * PARALLEL_TASK_NUM) as u64, - cache_directory: cache_file_directory.path().to_str().unwrap().to_string(), - }; - let cache = ObjectStorageCache::_new(config); - - for idx in 0..PARALLEL_TASK_NUM { - let mut temp_cache = cache.clone(); - let remote_file_directory = remote_file_directory.path().to_path_buf(); - - let handle = tokio::task::spawn_blocking(async move || -> DataCacheHandle { - let filename = format!("{}.parquet", idx); - let test_file = create_test_file(remote_file_directory.as_path(), &filename).await; - let data_file = create_data_file( - /*file_id=*/ idx as u64, - test_file.to_str().unwrap().to_string(), - ); - let (cache_handle, cache_to_delete) = temp_cache - ._get_cache_entry(data_file.file_id(), data_file.file_path()) - .await - .unwrap(); - assert!(cache_to_delete.is_empty()); - cache_handle - }); - handle_futures.push(handle); - } - - let results = futures::future::join_all(handle_futures).await; - for cur_handle_future in results.into_iter() { - let cur_cache_handle = cur_handle_future.unwrap().await; - let non_evictable_handle = get_non_evictable_cache_handle(&cur_cache_handle); - check_file_content(&non_evictable_handle.cache_entry.cache_filepath).await; - assert_eq!( - non_evictable_handle.cache_entry.file_metadata.file_size as usize, - CONTENT.len() - ); - } - assert_eq!(cache.cache.read().await.evictable_cache.len(), 0); - assert_eq!( - cache.cache.read().await.non_evictable_cache.len(), - PARALLEL_TASK_NUM - ); - check_cache_file_count(&cache_file_directory, PARALLEL_TASK_NUM).await; -} diff --git a/src/moonlink/src/storage/cache/object_storage/test_utils.rs b/src/moonlink/src/storage/cache/object_storage/test_utils.rs index 1e0f2f202..ea7af2a73 100644 --- a/src/moonlink/src/storage/cache/object_storage/test_utils.rs +++ b/src/moonlink/src/storage/cache/object_storage/test_utils.rs @@ -3,9 +3,7 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; use crate::storage::cache::object_storage::{ - base_cache::ObjectStorageCacheConfig, - cache_handle::{DataCacheHandle, NonEvictableHandle}, - object_storage_cache::ObjectStorageCache, + base_cache::ObjectStorageCacheConfig, object_storage_cache::ObjectStorageCache, }; use crate::storage::storage_utils::FileId; @@ -63,34 +61,6 @@ pub(crate) async fn check_cache_file_count(tmp_dir: &TempDir, expected_count: us assert_eq!(actual_count, expected_count); } -/// Test util function to assert returned cache handle is non-evictable. -pub(crate) async fn assert_non_evictable_cache_handle(data_cache_handle: &DataCacheHandle) { - let non_evictable_handle = match data_cache_handle { - DataCacheHandle::NonEvictable(non_evictable_handle) => non_evictable_handle, - _ => { - panic!("Expects to get non-evictable cache handle, but get unimported or evictable one") - } - }; - // Check cache file existence. - assert!( - tokio::fs::try_exists(&non_evictable_handle.cache_entry.cache_filepath) - .await - .unwrap() - ); -} - -/// Test util function to assert and get non-evictable cache handle. -pub(crate) fn get_non_evictable_cache_handle( - data_cache_handle: &DataCacheHandle, -) -> &NonEvictableHandle { - match data_cache_handle { - DataCacheHandle::NonEvictable(handle) => handle, - _ => { - panic!("Expects to get non-evictable cache handle, but get unimported or evictable one") - } - } -} - /// Test util function to check evictable cache size. pub(crate) async fn assert_evictable_cache_size( cache: &mut ObjectStorageCache, From a0f87a13fd34926bb31f9c2b38d6ad073103bfc6 Mon Sep 17 00:00:00 2001 From: Nolan Biscaro Date: Sat, 14 Jun 2025 18:07:18 +0000 Subject: [PATCH 2/2] add test analytics --- .config/nextest.toml | 2 ++ .github/workflows/ci.yml | 29 +++++++++++++++++++---------- 2 files changed, 21 insertions(+), 10 deletions(-) create mode 100644 .config/nextest.toml diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 000000000..76fd74b5d --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,2 @@ +[profile.ci.junit] +path = "junit.xml" diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0d03d6cea..f72b5878f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,7 +54,7 @@ jobs: # ───────────────────────── 2 · TEST & COVERAGE ──────────────────────────── coverage: - name: Unit Tests + Coverage (llvm-cov) + name: Unit Tests + Coverage (llvm-cov + nextest) runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -70,17 +70,26 @@ jobs: - uses: taiki-e/install-action@v2 with: { tool: cargo-llvm-cov } + - uses: taiki-e/install-action@v2 + with: { tool: nextest } - - name: Run tests and generate lcov.info + # ---------- Run the suite through nextest ---------- + - name: Run tests via llvm-cov/nextest + id: run_tests run: | - cargo llvm-cov --locked --lib \ - --lcov --output-path lcov.info - - - name: Upload to Codecov - uses: codecov/codecov-action@v5 + cargo llvm-cov \ + --locked \ + --lib \ + --lcov --output-path lcov.info \ + nextest \ + --profile ci \ + --no-fail-fast + + # ---------- Upload JUnit test results to Codecov ---------- + - name: Upload test results (JUnit) + if: ${{ !cancelled() }} + uses: codecov/test-results-action@v1 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} with: - files: lcov.info - verbose: true - fail_ci_if_error: true + files: target/nextest/ci/junit.xml