Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul
composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false }
composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false }
composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false }
cap-std-ext = "5.0"
ocidir = "0.7.1"

[profile.dev.package.sha2]
# this is *really* slow otherwise
Expand Down
3 changes: 3 additions & 0 deletions crates/composefs-oci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ sha2 = { version = "0.10.1", default-features = false }
tar-core = "0.1.0"
tokio = { version = "1.24.2", features = ["rt-multi-thread"] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }
tracing = "0.1"
cap-std-ext = { workspace = true }
ocidir = { workspace = true }

[dev-dependencies]
similar-asserts = "1.7.0"
Expand Down
99 changes: 99 additions & 0 deletions crates/composefs-oci/src/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
//! Shared layer import logic for OCI container images.
//!
//! This module provides common functionality for importing OCI image layers
//! into a composefs repository, shared between the skopeo proxy path and
//! direct OCI layout import.

use std::sync::Arc;

use anyhow::{bail, Result};
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use oci_spec::image::MediaType;
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};

use composefs::fsverity::FsVerityHashValue;
use composefs::repository::{ObjectStoreMethod, Repository};
use composefs::shared_internals::IO_BUF_CAPACITY;

use crate::skopeo::TAR_LAYER_CONTENT_TYPE;
use crate::tar::split_async;

/// Check if a media type represents a tar-based layer.
pub fn is_tar_media_type(media_type: &MediaType) -> bool {
matches!(
media_type,
MediaType::ImageLayer
| MediaType::ImageLayerGzip
| MediaType::ImageLayerZstd
| MediaType::ImageLayerNonDistributable
| MediaType::ImageLayerNonDistributableGzip
| MediaType::ImageLayerNonDistributableZstd
)
}

/// Wrap an async reader with the appropriate decompressor for the media type.
///
/// Returns a boxed reader that decompresses the stream if needed.
/// The output is `AsyncRead` (not `AsyncBufRead`) because `split_async`
/// does its own buffering via `BytesMut`.
pub fn decompress_async<'a, R>(
reader: R,
media_type: &MediaType,
) -> Result<Box<dyn AsyncRead + Unpin + Send + 'a>>
where
R: AsyncRead + Unpin + Send + 'a,
{
let buf = BufReader::new(reader);
let reader: Box<dyn AsyncRead + Unpin + Send> = match media_type {
MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => {
Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf))
}
MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new(
BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)),
),
MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new(
BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)),
),
_ => bail!("Unsupported layer media type for decompression: {media_type}"),
};
Ok(reader)
}

/// Import a tar layer from an async reader into the repository.
///
/// The reader should already be decompressed (use `decompress_async` first).
/// Returns the fs-verity object ID and import stats of the imported splitstream.
pub async fn import_tar_async<ObjectID, R>(
repo: Arc<Repository<ObjectID>>,
reader: R,
) -> Result<(ObjectID, crate::ImportStats)>
where
ObjectID: FsVerityHashValue,
R: AsyncRead + Unpin + Send,
{
split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await
}

/// Store raw bytes from an async reader as a repository object.
///
/// Streams the raw bytes into a repository object without creating a splitstream.
/// Use this for non-tar blobs (OCI artifacts) where the caller will create
/// the splitstream wrapper.
///
/// Returns (object_id, size, store_method) of the stored object.
pub async fn store_blob_async<ObjectID, R>(
repo: &Repository<ObjectID>,
mut reader: R,
) -> Result<(ObjectID, u64, ObjectStoreMethod)>
where
ObjectID: FsVerityHashValue,
R: AsyncRead + Unpin,
{
let tmpfile = repo.create_object_tmpfile()?;
let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile));
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.flush().await?;
let tmpfile = writer.into_std().await;
let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?;
Ok((object_id, size, method))
}
161 changes: 158 additions & 3 deletions crates/composefs-oci/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
#![forbid(unsafe_code)]

pub mod image;
pub mod layer;
pub mod oci_image;
pub mod oci_layout;
pub mod skopeo;
pub mod tar;

use std::{collections::HashMap, sync::Arc};

use anyhow::{bail, ensure, Context, Result};
use containers_image_proxy::ImageProxyConfig;
use oci_spec::image::ImageConfiguration;
use oci_spec::image::{Descriptor, ImageConfiguration, MediaType};
use sha2::{Digest, Sha256};

use composefs::{
Expand Down Expand Up @@ -175,6 +177,28 @@ pub async fn pull<ObjectID: FsVerityHashValue>(
})
}

/// Extract ordered diff_ids from a config descriptor.
///
/// For standard container images (ImageConfig media type), parses the
/// config JSON and returns `rootfs.diff_ids`. For artifacts with
/// non-standard config types, falls back to using manifest layer
/// digests as identifiers.
pub(crate) fn extract_diff_ids(
media_type: &MediaType,
config_bytes: &[u8],
manifest_layers: &[Descriptor],
) -> Result<Vec<String>> {
if *media_type == MediaType::ImageConfig {
let config = ImageConfiguration::from_reader(config_bytes)?;
Ok(config.rootfs().diff_ids().to_vec())
} else {
Ok(manifest_layers
.iter()
.map(|d| d.digest().to_string())
.collect())
}
}

fn hash(bytes: &[u8]) -> String {
let mut context = Sha256::new();
context.update(bytes);
Expand Down Expand Up @@ -235,8 +259,12 @@ pub fn write_config<ObjectID: FsVerityHashValue>(
let json_bytes = json.as_bytes();
let config_digest = hash(json_bytes);
let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE);
for (name, value) in &refs {
stream.add_named_stream_ref(name, value)
// Add refs in config-defined diff_id order for deterministic output
for diff_id in config.rootfs().diff_ids() {
let value = refs
.get(diff_id.as_str())
.with_context(|| format!("missing layer verity for diff_id {diff_id}"))?;
stream.add_named_stream_ref(diff_id, value);
}
stream.write_external(json_bytes)?;
let id = repo.write_stream(stream, &config_identifier(&config_digest), None)?;
Expand Down Expand Up @@ -343,6 +371,64 @@ mod test {
");
}

#[tokio::test]
async fn test_layer_import_stats() {
let layer = example_layer();
let mut context = Sha256::new();
context.update(&layer);
let layer_id = format!("sha256:{}", hex::encode(context.finalize()));

let repo_dir = tempdir();
let repo = Arc::new(Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap());
let (_id, stats) = import_layer(&repo, &layer_id, Some("name"), &layer[..])
.await
.unwrap();

// The example layer has files of sizes 0, 4095, 4096, 4097.
// Files > INLINE_CONTENT_MAX (64 bytes) are stored as external objects.
// So 4095, 4096, and 4097 are all external → 3 objects copied.
assert_eq!(
stats.objects_copied, 3,
"three files above inline threshold should be external objects"
);
assert_eq!(stats.objects_already_present, 0);
assert!(
stats.bytes_copied > 0,
"bytes_copied should be nonzero for external objects"
);
assert!(
stats.bytes_inlined > 0,
"bytes_inlined should be nonzero (tar headers + small file)"
);
}

#[tokio::test]
async fn test_layer_import_deduplication_stats() {
let layer = example_layer();
let mut context = Sha256::new();
context.update(&layer);
let layer_id = format!("sha256:{}", hex::encode(context.finalize()));

let repo_dir = tempdir();
let repo = Arc::new(Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap());

// First import
let (_id, stats1) = import_layer(&repo, &layer_id, None, &layer[..])
.await
.unwrap();
assert_eq!(stats1.objects_copied, 3);
assert_eq!(stats1.objects_already_present, 0);

// Re-import the same layer — the stream already exists so we get
// an early return with zero stats (idempotent).
let (_id, stats2) = import_layer(&repo, &layer_id, None, &layer[..])
.await
.unwrap();
assert_eq!(stats2.objects_copied, 0);
assert_eq!(stats2.objects_already_present, 0);
assert_eq!(stats2.bytes_copied, 0);
}

#[test]
fn test_write_and_open_config() {
use oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder};
Expand Down Expand Up @@ -440,6 +526,75 @@ mod test {
);
}

#[tokio::test]
async fn test_config_verity_deterministic() -> Result<()> {
use oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder};

let repo_dir = tempdir();
let repo = Arc::new(Repository::<Sha256HashValue>::open_path(CWD, &repo_dir).unwrap());

// Create 3 distinct layers with different content
let mut layers = Vec::new();
for (name, size) in [("alpha", 1000), ("beta", 2000), ("gamma", 3000)] {
let mut builder = ::tar::Builder::new(vec![]);
append_data(&mut builder, name, size);
let layer = builder.into_inner().unwrap();

let mut context = Sha256::new();
context.update(&layer);
let diff_id = format!("sha256:{}", hex::encode(context.finalize()));

let (verity, _stats) = import_layer(&repo, &diff_id, None, &mut layer.as_slice())
.await
.unwrap();
layers.push((diff_id, verity));
}

let diff_ids: Vec<String> = layers.iter().map(|(d, _)| d.clone()).collect();
let config = ImageConfigurationBuilder::default()
.architecture("amd64")
.os("linux")
.rootfs(
RootFsBuilder::default()
.typ("layers")
.diff_ids(diff_ids.clone())
.build()
.unwrap(),
)
.build()
.unwrap();

// Build refs HashMaps with different insertion orders to exercise
// that write_config uses config-defined diff_id order, not HashMap order.
let refs1: HashMap<Box<str>, Sha256HashValue> = layers
.iter()
.map(|(d, v)| (d.as_str().into(), v.clone()))
.collect();
let refs2: HashMap<Box<str>, Sha256HashValue> = layers
.iter()
.rev()
.map(|(d, v)| (d.as_str().into(), v.clone()))
.collect();

let (_digest1, verity1) = write_config(&repo, &config, refs1)?;
let (_digest2, verity2) = write_config(&repo, &config, refs2)?;

// The verity must be identical regardless of HashMap iteration order
assert_eq!(
verity1, verity2,
"config verity must be deterministic across calls"
);

// Hardcoded expected value to catch any accidental changes
assert_eq!(
verity1.to_hex(),
"7752e739a60d1e697ac96545fc23d9a6e5f254625fd313a146722781cef0ffac",
"config verity changed unexpectedly"
);

Ok(())
}

#[test]
fn test_open_config_bad_hash() {
use oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder};
Expand Down
Loading
Loading