Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[profile.ci.junit]
path = "junit.xml"
29 changes: 19 additions & 10 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:

# ───────────────────────── 2 · TEST & COVERAGE ────────────────────────────
coverage:
name: Unit Tests + Coverage (llvm-cov)
name: Unit Tests + Coverage (llvm-cov + nextest)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand All @@ -70,17 +70,26 @@ jobs:

- uses: taiki-e/install-action@v2
with: { tool: cargo-llvm-cov }
- uses: taiki-e/install-action@v2
with: { tool: nextest }

- name: Run tests and generate lcov.info
# ---------- Run the suite through nextest ----------
- name: Run tests via llvm-cov/nextest
id: run_tests
run: |
cargo llvm-cov --locked --lib \
--lcov --output-path lcov.info

- name: Upload to Codecov
uses: codecov/codecov-action@v5
cargo llvm-cov \
--locked \
--lib \
--lcov --output-path lcov.info \
nextest \
--profile ci \
--no-fail-fast

# ---------- Upload JUnit test results to Codecov ----------
- name: Upload test results (JUnit)
if: ${{ !cancelled() }}
uses: codecov/test-results-action@v1
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
files: lcov.info
verbose: true
fail_ci_if_error: true
files: target/nextest/ci/junit.xml
12 changes: 8 additions & 4 deletions src/moonlink/src/storage/cache/object_storage/base_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
/// - evictable if a cache entry is unreferenced
use async_trait::async_trait;

use crate::storage::cache::object_storage::cache_handle::DataCacheHandle;
use crate::storage::cache::object_storage::cache_handle::NonEvictableHandle;
use crate::storage::storage_utils::FileId;
use crate::Result;

Expand Down Expand Up @@ -37,21 +37,25 @@ pub struct CacheEntry {
#[allow(dead_code)]
#[async_trait]
pub trait CacheTrait {
/// Import cache entry to the cache.
/// Import cache entry to the cache. If there's no enough disk space, panic directly.
/// Precondition: the file is not managed by cache.
#[allow(async_fn_in_trait)]
async fn _import_cache_entry(
&mut self,
file_id: FileId,
cache_entry: CacheEntry,
) -> (DataCacheHandle, Vec<String>);
) -> (NonEvictableHandle, Vec<String>);

/// Get file entry.
/// If the requested file entry doesn't exist in cache, an IO operation is performed.
/// If there's no sufficient disk space, return [`None`].
#[allow(async_fn_in_trait)]
async fn _get_cache_entry(
&mut self,
file_id: FileId,
remote_filepath: &str,
) -> Result<(DataCacheHandle, Vec<String>)>;
) -> Result<(
Option<NonEvictableHandle>,
Vec<String>, /*files_to_delete*/
)>;
}
22 changes: 0 additions & 22 deletions src/moonlink/src/storage/cache/object_storage/cache_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,3 @@ impl NonEvictableHandle {
guard._unreference(self.file_id);
}
}

/// A unified handle for data file cache entries, which represents different states for a data file cache resource.
#[allow(dead_code)]
#[derive(Debug)]
pub enum DataCacheHandle {
/// Cache file is managed by data file already and at evictable state; should pin before use.
Evictable,
/// Cache file is managed by data file already and pinned, could use at any time.
NonEvictable(NonEvictableHandle),
}

impl DataCacheHandle {
/// Unreferenced the pinned cache file.
pub async fn _unreference(&mut self) {
match self {
DataCacheHandle::NonEvictable(handle) => {
handle._unreference().await;
}
_ => panic!("Cannot unreference for an unpinned cache handle"),
}
}
}
119 changes: 79 additions & 40 deletions src/moonlink/src/storage/cache/object_storage/object_storage_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::sync::Arc;
use crate::storage::cache::object_storage::base_cache::{
CacheEntry, CacheTrait, FileMetadata, ObjectStorageCacheConfig,
};
use crate::storage::cache::object_storage::cache_handle::{DataCacheHandle, NonEvictableHandle};
use crate::storage::cache::object_storage::cache_handle::NonEvictableHandle;
use crate::storage::storage_utils::FileId;
use crate::Result;

Expand Down Expand Up @@ -39,39 +39,65 @@ pub(crate) struct ObjectStorageCacheInternal {

impl ObjectStorageCacheInternal {
/// Util function to remove entries from evictable cache, until overall file size drops down below max size.
/// Return data files which get evicted from LRU cache, and will be deleted locally.
fn _evict_cache_entries(&mut self, max_bytes: u64) -> Vec<String> {
///
/// # Arguments
///
/// * tolerate_insufficiency: if true, tolerate disk space insufficiency by returning `false` in such case; otherwise panic if nothing to evict when insufficient disk space.
///
/// Return
/// - whether cache entries eviction succeeds or not.
/// - data files which get evicted from LRU cache, and will be deleted locally.
fn _evict_cache_entries(
&mut self,
max_bytes: u64,
tolerate_insufficiency: bool,
) -> (bool, Vec<String>) {
let mut evicted_data_files = vec![];
while self.cur_bytes > max_bytes {
// TODO(hjiang): In certain case, we could tolerate disk space insufficiency (i.e. when pg_mooncake request for data files, we could fallback to return remote files).
assert!(
!self.evictable_cache.is_empty(),
"Cannot reduce disk usage by evicting entries."
);
if self.evictable_cache.is_empty() {
assert!(
tolerate_insufficiency,
"Cannot reduce disk usage by evicting entries."
);
return (false, evicted_data_files);
}
let (_, mut cache_entry_wrapper) = self.evictable_cache.pop_lru().unwrap();
assert_eq!(cache_entry_wrapper.reference_count, 0);
self.cur_bytes -= cache_entry_wrapper.cache_entry.file_metadata.file_size;
let cache_filepath =
std::mem::take(&mut cache_entry_wrapper.cache_entry.cache_filepath);
evicted_data_files.push(cache_filepath);
}
evicted_data_files
(true, evicted_data_files)
}

/// Util function to insert into non-evictable cache.
/// NOTICE: cache current bytes won't be updated.
///
/// Return
/// - whether cache entries eviction succeeds or not.
/// - data files which get evicted from LRU cache, and will be deleted locally.
///
/// NOTICE:
/// - cache current bytes won't be updated.
/// - If insertion fails due to insufficiency, the input cache entry won't be inserted into cache.
fn _insert_non_evictable(
&mut self,
file_id: FileId,
cache_entry_wrapper: CacheEntryWrapper,
max_bytes: u64,
) -> Vec<String> {
tolerate_insufficiency: bool,
) -> (bool, Vec<String>) {
assert!(self.evictable_cache.get(&file_id).is_none());
let old_entry = self
assert!(self
.non_evictable_cache
.insert(file_id, cache_entry_wrapper);
assert!(old_entry.is_none());
self._evict_cache_entries(max_bytes)
.insert(file_id, cache_entry_wrapper)
.is_none());
let (evict_succ, files_to_delete) =
self._evict_cache_entries(max_bytes, tolerate_insufficiency);
if !evict_succ {
assert!(self.non_evictable_cache.remove(&file_id).is_some());
}
(evict_succ, files_to_delete)
}

/// Unreference the given cache entry.
Expand Down Expand Up @@ -139,7 +165,7 @@ impl CacheTrait for ObjectStorageCache {
&mut self,
file_id: FileId,
cache_entry: CacheEntry,
) -> (DataCacheHandle, Vec<String>) {
) -> (NonEvictableHandle, Vec<String>) {
let cache_entry_wrapper = CacheEntryWrapper {
cache_entry: cache_entry.clone(),
reference_count: 1,
Expand All @@ -148,21 +174,27 @@ impl CacheTrait for ObjectStorageCache {
let mut guard = self.cache.write().await;
guard.cur_bytes += cache_entry.file_metadata.file_size;

let cache_files_to_delete =
guard._insert_non_evictable(file_id, cache_entry_wrapper, self.config.max_bytes);
let cache_files_to_delete = guard
._insert_non_evictable(
file_id,
cache_entry_wrapper,
self.config.max_bytes,
/*tolerate_insufficiency=*/ false,
)
.1;
let non_evictable_handle =
NonEvictableHandle::_new(file_id, cache_entry, self.cache.clone());
(
DataCacheHandle::NonEvictable(non_evictable_handle),
cache_files_to_delete,
)
(non_evictable_handle, cache_files_to_delete)
}

async fn _get_cache_entry(
&mut self,
file_id: FileId,
remote_filepath: &str,
) -> Result<(DataCacheHandle, Vec<String>)> {
) -> Result<(
Option<NonEvictableHandle>,
Vec<String>, /*files_to_delete*/
)> {
{
let mut guard = self.cache.write().await;

Expand All @@ -174,7 +206,7 @@ impl CacheTrait for ObjectStorageCache {
let cache_entry = value.as_ref().unwrap().cache_entry.clone();
let non_evictable_handle =
NonEvictableHandle::_new(file_id, cache_entry, self.cache.clone());
return Ok((DataCacheHandle::NonEvictable(non_evictable_handle), vec![]));
return Ok((Some(non_evictable_handle), /*files_to_delete=*/ vec![]));
}

// Check evictable cache.
Expand All @@ -183,14 +215,18 @@ impl CacheTrait for ObjectStorageCache {
assert_eq!(value.as_ref().unwrap().reference_count, 0);
value.as_mut().unwrap().reference_count += 1;
let cache_entry = value.as_ref().unwrap().cache_entry.clone();
let files_to_delete =
guard._insert_non_evictable(file_id, value.unwrap(), self.config.max_bytes);
let files_to_delete = guard
._insert_non_evictable(
file_id,
value.unwrap(),
self.config.max_bytes,
/*tolerate_insufficiency=*/ true,
)
.1;
assert!(files_to_delete.is_empty());
let non_evictable_handle =
NonEvictableHandle::_new(file_id, cache_entry, self.cache.clone());
return Ok((
DataCacheHandle::NonEvictable(non_evictable_handle),
files_to_delete,
));
return Ok((Some(non_evictable_handle), /*files_to_delete=*/ vec![]));
}
}

Expand All @@ -210,12 +246,16 @@ impl CacheTrait for ObjectStorageCache {
{
let mut guard = self.cache.write().await;
guard.cur_bytes += cache_entry.file_metadata.file_size;
let evicted_entries =
guard._insert_non_evictable(file_id, cache_entry_wrapper, self.config.max_bytes);
Ok((
DataCacheHandle::NonEvictable(non_evictable_handle),
evicted_entries,
))
let (cache_succ, files_to_delete) = guard._insert_non_evictable(
file_id,
cache_entry_wrapper,
self.config.max_bytes,
/*tolerate_insufficiency=*/ true,
);
if cache_succ {
return Ok((Some(non_evictable_handle), files_to_delete));
}
Ok((None, files_to_delete))
}
}
}
Expand Down Expand Up @@ -248,7 +288,7 @@ mod tests {
let mut temp_cache = cache.clone();
let remote_file_directory = remote_file_directory.path().to_path_buf();

let handle = tokio::task::spawn_blocking(async move || -> DataCacheHandle {
let handle = tokio::task::spawn_blocking(async move || -> NonEvictableHandle {
let filename = format!("{}.parquet", idx);
let test_file = create_test_file(remote_file_directory.as_path(), &filename).await;
let data_file = create_data_file(
Expand All @@ -260,15 +300,14 @@ mod tests {
.await
.unwrap();
assert!(cache_to_delete.is_empty());
cache_handle
cache_handle.unwrap()
});
handle_futures.push(handle);
}

let results = futures::future::join_all(handle_futures).await;
for cur_handle_future in results.into_iter() {
let cur_cache_handle = cur_handle_future.unwrap().await;
let non_evictable_handle = get_non_evictable_cache_handle(&cur_cache_handle);
let non_evictable_handle = cur_handle_future.unwrap().await;
check_file_content(&non_evictable_handle.cache_entry.cache_filepath).await;
assert_eq!(
non_evictable_handle.cache_entry.file_metadata.file_size as usize,
Expand Down
Loading