diff --git a/Cargo.toml b/Cargo.toml index d36fdf62..bcc2ad4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,8 @@ composefs-ioctls = { version = "0.3.0", path = "crates/composefs-ioctls", defaul composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +cap-std-ext = "5.0" +ocidir = "0.7.1" [profile.dev.package.sha2] # this is *really* slow otherwise diff --git a/crates/composefs-oci/Cargo.toml b/crates/composefs-oci/Cargo.toml index eda0e0ec..c84948a9 100644 --- a/crates/composefs-oci/Cargo.toml +++ b/crates/composefs-oci/Cargo.toml @@ -27,6 +27,9 @@ sha2 = { version = "0.10.1", default-features = false } tar-core = "0.1.0" tokio = { version = "1.24.2", features = ["rt-multi-thread"] } tokio-util = { version = "0.7", default-features = false, features = ["io"] } +tracing = "0.1" +cap-std-ext = { workspace = true } +ocidir = { workspace = true } [dev-dependencies] similar-asserts = "1.7.0" diff --git a/crates/composefs-oci/src/layer.rs b/crates/composefs-oci/src/layer.rs new file mode 100644 index 00000000..54a6a49f --- /dev/null +++ b/crates/composefs-oci/src/layer.rs @@ -0,0 +1,99 @@ +//! Shared layer import logic for OCI container images. +//! +//! This module provides common functionality for importing OCI image layers +//! into a composefs repository, shared between the skopeo proxy path and +//! direct OCI layout import. + +use std::sync::Arc; + +use anyhow::{bail, Result}; +use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; +use oci_spec::image::MediaType; +use tokio::io::{AsyncRead, AsyncWriteExt, BufReader}; + +use composefs::fsverity::FsVerityHashValue; +use composefs::repository::{ObjectStoreMethod, Repository}; +use composefs::shared_internals::IO_BUF_CAPACITY; + +use crate::skopeo::TAR_LAYER_CONTENT_TYPE; +use crate::tar::split_async; + +/// Check if a media type represents a tar-based layer. +pub fn is_tar_media_type(media_type: &MediaType) -> bool { + matches!( + media_type, + MediaType::ImageLayer + | MediaType::ImageLayerGzip + | MediaType::ImageLayerZstd + | MediaType::ImageLayerNonDistributable + | MediaType::ImageLayerNonDistributableGzip + | MediaType::ImageLayerNonDistributableZstd + ) +} + +/// Wrap an async reader with the appropriate decompressor for the media type. +/// +/// Returns a boxed reader that decompresses the stream if needed. +/// The output is `AsyncRead` (not `AsyncBufRead`) because `split_async` +/// does its own buffering via `BytesMut`. +pub fn decompress_async<'a, R>( + reader: R, + media_type: &MediaType, +) -> Result> +where + R: AsyncRead + Unpin + Send + 'a, +{ + let buf = BufReader::new(reader); + let reader: Box = match media_type { + MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => { + Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, buf)) + } + MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => Box::new( + BufReader::with_capacity(IO_BUF_CAPACITY, GzipDecoder::new(buf)), + ), + MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => Box::new( + BufReader::with_capacity(IO_BUF_CAPACITY, ZstdDecoder::new(buf)), + ), + _ => bail!("Unsupported layer media type for decompression: {media_type}"), + }; + Ok(reader) +} + +/// Import a tar layer from an async reader into the repository. +/// +/// The reader should already be decompressed (use `decompress_async` first). +/// Returns the fs-verity object ID and import stats of the imported splitstream. +pub async fn import_tar_async( + repo: Arc>, + reader: R, +) -> Result<(ObjectID, crate::ImportStats)> +where + ObjectID: FsVerityHashValue, + R: AsyncRead + Unpin + Send, +{ + split_async(reader, repo, TAR_LAYER_CONTENT_TYPE).await +} + +/// Store raw bytes from an async reader as a repository object. +/// +/// Streams the raw bytes into a repository object without creating a splitstream. +/// Use this for non-tar blobs (OCI artifacts) where the caller will create +/// the splitstream wrapper. +/// +/// Returns (object_id, size, store_method) of the stored object. +pub async fn store_blob_async( + repo: &Repository, + mut reader: R, +) -> Result<(ObjectID, u64, ObjectStoreMethod)> +where + ObjectID: FsVerityHashValue, + R: AsyncRead + Unpin, +{ + let tmpfile = repo.create_object_tmpfile()?; + let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile)); + let size = tokio::io::copy(&mut reader, &mut writer).await?; + writer.flush().await?; + let tmpfile = writer.into_std().await; + let (object_id, method) = repo.finalize_object_tmpfile(tmpfile, size)?; + Ok((object_id, size, method)) +} diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 65f1a837..f6841a5a 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -13,7 +13,9 @@ #![forbid(unsafe_code)] pub mod image; +pub mod layer; pub mod oci_image; +pub mod oci_layout; pub mod skopeo; pub mod tar; @@ -21,7 +23,7 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{bail, ensure, Context, Result}; use containers_image_proxy::ImageProxyConfig; -use oci_spec::image::ImageConfiguration; +use oci_spec::image::{Descriptor, ImageConfiguration, MediaType}; use sha2::{Digest, Sha256}; use composefs::{ @@ -175,6 +177,28 @@ pub async fn pull( }) } +/// Extract ordered diff_ids from a config descriptor. +/// +/// For standard container images (ImageConfig media type), parses the +/// config JSON and returns `rootfs.diff_ids`. For artifacts with +/// non-standard config types, falls back to using manifest layer +/// digests as identifiers. +pub(crate) fn extract_diff_ids( + media_type: &MediaType, + config_bytes: &[u8], + manifest_layers: &[Descriptor], +) -> Result> { + if *media_type == MediaType::ImageConfig { + let config = ImageConfiguration::from_reader(config_bytes)?; + Ok(config.rootfs().diff_ids().to_vec()) + } else { + Ok(manifest_layers + .iter() + .map(|d| d.digest().to_string()) + .collect()) + } +} + fn hash(bytes: &[u8]) -> String { let mut context = Sha256::new(); context.update(bytes); @@ -235,8 +259,12 @@ pub fn write_config( let json_bytes = json.as_bytes(); let config_digest = hash(json_bytes); let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE); - for (name, value) in &refs { - stream.add_named_stream_ref(name, value) + // Add refs in config-defined diff_id order for deterministic output + for diff_id in config.rootfs().diff_ids() { + let value = refs + .get(diff_id.as_str()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + stream.add_named_stream_ref(diff_id, value); } stream.write_external(json_bytes)?; let id = repo.write_stream(stream, &config_identifier(&config_digest), None)?; @@ -343,6 +371,64 @@ mod test { "); } + #[tokio::test] + async fn test_layer_import_stats() { + let layer = example_layer(); + let mut context = Sha256::new(); + context.update(&layer); + let layer_id = format!("sha256:{}", hex::encode(context.finalize())); + + let repo_dir = tempdir(); + let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); + let (_id, stats) = import_layer(&repo, &layer_id, Some("name"), &layer[..]) + .await + .unwrap(); + + // The example layer has files of sizes 0, 4095, 4096, 4097. + // Files > INLINE_CONTENT_MAX (64 bytes) are stored as external objects. + // So 4095, 4096, and 4097 are all external → 3 objects copied. + assert_eq!( + stats.objects_copied, 3, + "three files above inline threshold should be external objects" + ); + assert_eq!(stats.objects_already_present, 0); + assert!( + stats.bytes_copied > 0, + "bytes_copied should be nonzero for external objects" + ); + assert!( + stats.bytes_inlined > 0, + "bytes_inlined should be nonzero (tar headers + small file)" + ); + } + + #[tokio::test] + async fn test_layer_import_deduplication_stats() { + let layer = example_layer(); + let mut context = Sha256::new(); + context.update(&layer); + let layer_id = format!("sha256:{}", hex::encode(context.finalize())); + + let repo_dir = tempdir(); + let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); + + // First import + let (_id, stats1) = import_layer(&repo, &layer_id, None, &layer[..]) + .await + .unwrap(); + assert_eq!(stats1.objects_copied, 3); + assert_eq!(stats1.objects_already_present, 0); + + // Re-import the same layer — the stream already exists so we get + // an early return with zero stats (idempotent). + let (_id, stats2) = import_layer(&repo, &layer_id, None, &layer[..]) + .await + .unwrap(); + assert_eq!(stats2.objects_copied, 0); + assert_eq!(stats2.objects_already_present, 0); + assert_eq!(stats2.bytes_copied, 0); + } + #[test] fn test_write_and_open_config() { use oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder}; @@ -440,6 +526,75 @@ mod test { ); } + #[tokio::test] + async fn test_config_verity_deterministic() -> Result<()> { + use oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder}; + + let repo_dir = tempdir(); + let repo = Arc::new(Repository::::open_path(CWD, &repo_dir).unwrap()); + + // Create 3 distinct layers with different content + let mut layers = Vec::new(); + for (name, size) in [("alpha", 1000), ("beta", 2000), ("gamma", 3000)] { + let mut builder = ::tar::Builder::new(vec![]); + append_data(&mut builder, name, size); + let layer = builder.into_inner().unwrap(); + + let mut context = Sha256::new(); + context.update(&layer); + let diff_id = format!("sha256:{}", hex::encode(context.finalize())); + + let (verity, _stats) = import_layer(&repo, &diff_id, None, &mut layer.as_slice()) + .await + .unwrap(); + layers.push((diff_id, verity)); + } + + let diff_ids: Vec = layers.iter().map(|(d, _)| d.clone()).collect(); + let config = ImageConfigurationBuilder::default() + .architecture("amd64") + .os("linux") + .rootfs( + RootFsBuilder::default() + .typ("layers") + .diff_ids(diff_ids.clone()) + .build() + .unwrap(), + ) + .build() + .unwrap(); + + // Build refs HashMaps with different insertion orders to exercise + // that write_config uses config-defined diff_id order, not HashMap order. + let refs1: HashMap, Sha256HashValue> = layers + .iter() + .map(|(d, v)| (d.as_str().into(), v.clone())) + .collect(); + let refs2: HashMap, Sha256HashValue> = layers + .iter() + .rev() + .map(|(d, v)| (d.as_str().into(), v.clone())) + .collect(); + + let (_digest1, verity1) = write_config(&repo, &config, refs1)?; + let (_digest2, verity2) = write_config(&repo, &config, refs2)?; + + // The verity must be identical regardless of HashMap iteration order + assert_eq!( + verity1, verity2, + "config verity must be deterministic across calls" + ); + + // Hardcoded expected value to catch any accidental changes + assert_eq!( + verity1.to_hex(), + "7752e739a60d1e697ac96545fc23d9a6e5f254625fd313a146722781cef0ffac", + "config verity changed unexpectedly" + ); + + Ok(()) + } + #[test] fn test_open_config_bad_hash() { use oci_spec::image::{ImageConfigurationBuilder, RootFsBuilder}; diff --git a/crates/composefs-oci/src/oci_image.rs b/crates/composefs-oci/src/oci_image.rs index aa125d07..534489a9 100644 --- a/crates/composefs-oci/src/oci_image.rs +++ b/crates/composefs-oci/src/oci_image.rs @@ -51,6 +51,7 @@ use sha2::{Digest, Sha256}; use composefs::{fsverity::FsVerityHashValue, repository::Repository}; +use crate::layer::is_tar_media_type; use crate::skopeo::{OCI_BLOB_CONTENT_TYPE, OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE}; /// Data and named refs from a splitstream with external object storage. @@ -550,12 +551,12 @@ pub fn list_images( /// a signature can reference the fsverity digest of the manifest content directly. /// /// The manifest becomes a GC root only if a `reference` name is provided. -pub fn write_manifest( +pub fn write_manifest>( repo: &Arc>, manifest: &ImageManifest, manifest_digest: &str, config_verity: &ObjectID, - layer_verities: &HashMap, ObjectID>, + layer_verities: &[(S, ObjectID)], reference: Option<&str>, ) -> Result<(String, ObjectID)> { let content_id = manifest_identifier(manifest_digest); @@ -583,7 +584,7 @@ pub fn write_manifest( stream.add_named_stream_ref(&config_key, config_verity); for (diff_id, verity) in layer_verities { - stream.add_named_stream_ref(diff_id, verity); + stream.add_named_stream_ref(diff_id.as_ref(), verity); } stream.write_external(json_bytes)?; @@ -607,19 +608,6 @@ pub fn manifest_identifier(digest: &str) -> String { format!("oci-manifest-{digest}") } -/// Returns true if this is a tar-based layer media type. -pub(crate) fn is_tar_media_type(media_type: &MediaType) -> bool { - matches!( - media_type, - MediaType::ImageLayer - | MediaType::ImageLayerGzip - | MediaType::ImageLayerZstd - | MediaType::ImageLayerNonDistributable - | MediaType::ImageLayerNonDistributableGzip - | MediaType::ImageLayerNonDistributableZstd - ) -} - /// Returns the reference path for an OCI name. fn oci_ref_path(name: &str) -> String { format!("{OCI_REF_PREFIX}{}", encode_tag(name)) @@ -1148,8 +1136,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(layer_digest.into_boxed_str(), layer_verity); + let layer_verities = [(layer_digest, layer_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); @@ -1412,9 +1399,8 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); // For artifacts, we use the blob digest as the "diff_id" equivalent - layer_verities.insert(blob_digest.clone().into_boxed_str(), blob_verity.clone()); + let layer_verities = [(blob_digest.clone(), blob_verity.clone())]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); @@ -1525,8 +1511,7 @@ mod test { // Store manifest — layer_verities uses the layer digest as key // (same logic as ensure_config_with_layers when !is_image_config) - let mut layer_verities = HashMap::new(); - layer_verities.insert(layer_digest.clone().into_boxed_str(), layer_verity.clone()); + let layer_verities = [(layer_digest.clone(), layer_verity.clone())]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); @@ -1681,8 +1666,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(diff_id.clone().into_boxed_str(), layer_verity); + let layer_verities = [(diff_id.clone(), layer_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); @@ -2079,11 +2063,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert( - shared_layer_digest.clone().into_boxed_str(), - shared_layer_verity.clone(), - ); + let layer_verities = [(shared_layer_digest.clone(), shared_layer_verity.clone())]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); @@ -2243,8 +2223,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(blob_digest.into_boxed_str(), blob_verity); + let layer_verities = [(blob_digest, blob_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); @@ -2319,8 +2298,7 @@ mod test { .build() .unwrap(); - let mut layer_verities = HashMap::new(); - layer_verities.insert(blob_digest.into_boxed_str(), blob_verity); + let layer_verities = [(blob_digest, blob_verity)]; let manifest_json = manifest.to_string().unwrap(); let manifest_digest = hash(manifest_json.as_bytes()); diff --git a/crates/composefs-oci/src/oci_layout.rs b/crates/composefs-oci/src/oci_layout.rs new file mode 100644 index 00000000..160aaff6 --- /dev/null +++ b/crates/composefs-oci/src/oci_layout.rs @@ -0,0 +1,429 @@ +//! Direct OCI layout directory import without the skopeo proxy. +//! +//! This module provides a fast path for importing images from local OCI layout +//! directories (the `oci:` transport). Instead of going through the +//! containers-image-proxy (which spawns skopeo as a subprocess), we read the +//! OCI layout directly using the `ocidir` crate. +//! +//! This is significantly faster for local imports since: +//! - No subprocess overhead from skopeo +//! - No IPC/pipe overhead for blob streaming +//! - Direct file I/O instead of proxy protocol parsing +//! +//! The import produces identical results to the proxy path: the same +//! splitstream format with the same content identifiers. + +use std::cmp::Reverse; +use std::collections::HashMap; +use std::io::Read; +use std::path::Path; +use std::sync::Arc; +use std::thread::available_parallelism; + +use anyhow::{Context, Result}; +use cap_std_ext::cap_std; +use fn_error_context::context; +use oci_spec::image::{Descriptor, MediaType}; +use ocidir::OciDir; +use tokio::sync::Semaphore; +use tokio::task::JoinSet; +use tracing::debug; + +use composefs::fsverity::FsVerityHashValue; +use composefs::repository::{ObjectStoreMethod, Repository}; + +use crate::layer::{decompress_async, import_tar_async, is_tar_media_type, store_blob_async}; +use crate::oci_image::manifest_identifier; +use crate::skopeo::OCI_BLOB_CONTENT_TYPE; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, OCI_MANIFEST_CONTENT_TYPE}; +use crate::{config_identifier, layer_identifier, oci_image::tag_image, ImportStats}; + +use crate::skopeo::PullResult; + +/// Parse an OCI layout reference like "/path/to/dir:tag" or "/path/to/dir". +/// +/// Returns (path, optional_tag). +pub(crate) fn parse_oci_layout_ref(imgref: &str) -> (&str, Option<&str>) { + // The format is: path[:tag] + // We need to be careful: paths can contain colons (on Windows, or weird Unix paths). + // The convention is that if the last colon is after the last slash, it's a tag separator. + + let Some((before_colon, tag)) = imgref.rsplit_once(':') else { + return (imgref, None); + }; + + if tag.contains('/') { + // Slash after the colon means colon is part of the path + (imgref, None) + } else { + // No slash after the colon - it's a tag separator + (before_colon, Some(tag)) + } +} + +/// Read a blob from an OCI layout as bytes. +fn read_blob_bytes(ocidir: &OciDir, desc: &Descriptor) -> Result> { + let mut file = ocidir.read_blob(desc)?; + let mut bytes = Vec::with_capacity(desc.size() as usize); + file.read_to_end(&mut bytes)?; + Ok(bytes) +} + +/// Import an image from a local OCI layout directory. +/// +/// This is the fast path for `oci:` transport references. It reads the OCI +/// layout directly without going through skopeo. +#[context("Importing OCI layout from {}", layout_path.display())] +pub async fn import_oci_layout( + repo: &Arc>, + layout_path: &Path, + layout_tag: Option<&str>, + reference: Option<&str>, +) -> Result<(PullResult, ImportStats)> { + // Open the OCI layout directory + let dir = cap_std::fs::Dir::open_ambient_dir(layout_path, cap_std::ambient_authority()) + .with_context(|| format!("Opening OCI layout directory {}", layout_path.display()))?; + let ocidir = OciDir::open(dir).context("Opening OCI directory")?; + + // Resolve the manifest for this platform, peeling through manifest lists if needed + let resolved = ocidir + .open_image_this_platform(layout_tag) + .context("Resolving manifest for platform")?; + + let manifest = resolved.manifest; + let manifest_descriptor = &resolved.manifest_descriptor; + let manifest_digest = manifest_descriptor.digest().to_string(); + + // Import config and layers + let config_descriptor = manifest.config(); + let layers = manifest.layers(); + let (config_digest, config_verity, layer_refs, stats) = + import_config_and_layers(repo, &ocidir, layers, config_descriptor) + .await + .with_context(|| format!("Failed to import config {}", config_descriptor.digest()))?; + + // Store the manifest + let manifest_content_id = manifest_identifier(&manifest_digest); + let manifest_verity = if let Some(verity) = repo.has_stream(&manifest_content_id)? { + debug!("Already have manifest {manifest_digest}"); + verity + } else { + debug!("Storing manifest {manifest_digest}"); + + let mut splitstream = repo.create_stream(OCI_MANIFEST_CONTENT_TYPE); + + let config_key = format!("config:{}", config_descriptor.digest()); + splitstream.add_named_stream_ref(&config_key, &config_verity); + + // Add layer refs in config-defined diff_id order + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id, verity); + } + + let raw_manifest = + read_blob_bytes(&ocidir, manifest_descriptor).context("Reading raw manifest bytes")?; + splitstream.write_external(&raw_manifest)?; + repo.write_stream(splitstream, &manifest_content_id, None)? + }; + + // Tag if requested + if let Some(name) = reference { + tag_image(repo, &manifest_digest, name)?; + } + + Ok(( + PullResult { + manifest_digest, + manifest_verity, + config_digest, + config_verity, + }, + stats, + )) +} + +/// Import config and all layers from an OCI layout. +/// +/// Returns (config_digest, config_verity, layer_refs, stats). +/// `layer_refs` is an ordered Vec of (diff_id, verity) pairs preserving the +/// order from the config (or manifest for artifacts). +async fn import_config_and_layers( + repo: &Arc>, + ocidir: &OciDir, + manifest_layers: &[Descriptor], + config_descriptor: &Descriptor, +) -> Result<(String, ObjectID, Vec<(String, ObjectID)>, ImportStats)> { + let config_digest: &str = config_descriptor.digest().as_ref(); + let content_id = config_identifier(config_digest); + + if let Some(config_id) = repo.has_stream(&content_id)? { + debug!("Already have container config {config_digest}"); + + let (data, named_refs) = crate::oci_image::read_external_splitstream( + repo, + &content_id, + Some(&config_id), + Some(OCI_CONFIG_CONTENT_TYPE), + )?; + let named_refs_map: HashMap<&str, ObjectID> = named_refs + .iter() + .map(|(k, v)| (k.as_ref(), v.clone())) + .collect(); + + let diff_ids = + crate::extract_diff_ids(config_descriptor.media_type(), &data, manifest_layers)?; + + let layer_refs: Vec<(String, ObjectID)> = diff_ids + .into_iter() + .map(|diff_id| { + let verity = named_refs_map + .get(diff_id.as_str()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); + + return Ok(( + config_digest.to_string(), + config_id, + layer_refs, + ImportStats::default(), + )); + } + + // Fetch config + debug!("Reading config {config_digest}"); + let raw_config = read_blob_bytes(ocidir, config_descriptor).context("Reading config blob")?; + + // Parse config to get diff_ids (if this is a container image). + // For artifacts with non-standard config types, falls back to layer digests. + let diff_ids = + crate::extract_diff_ids(config_descriptor.media_type(), &raw_config, manifest_layers)?; + + // Sort layers by size for parallel fetching (largest first) + let mut layers: Vec<_> = manifest_layers.iter().zip(&diff_ids).collect(); + layers.sort_by_key(|(desc, _)| Reverse(desc.size())); + + let threads = available_parallelism()?; + let sem = Arc::new(Semaphore::new(threads.into())); + let mut layer_tasks = JoinSet::new(); + + for (idx, (descriptor, diff_id)) in layers.iter().enumerate() { + let diff_id = diff_id.to_string(); + let repo = Arc::clone(repo); + let permit = Arc::clone(&sem).acquire_owned().await?; + + let layer_file = ocidir + .read_blob(descriptor) + .with_context(|| format!("Opening layer blob {}", descriptor.digest()))?; + + let media_type = descriptor.media_type().clone(); + + layer_tasks.spawn(async move { + let _permit = permit; + let (verity, layer_stats) = + import_layer_from_file(&repo, &diff_id, layer_file, &media_type).await?; + anyhow::Ok((idx, diff_id, verity, layer_stats)) + }); + } + + // Collect results into a map keyed by diff_id for ordered lookup + let mut verity_map = HashMap::new(); + let mut stats = ImportStats::default(); + for result in layer_tasks.join_all().await { + let (_, diff_id, verity, layer_stats) = result?; + verity_map.insert(diff_id, verity); + stats.merge(&layer_stats); + } + + // Build ordered layer_refs from config-defined diff_id order + let layer_refs: Vec<(String, ObjectID)> = diff_ids + .into_iter() + .map(|diff_id| { + let verity = verity_map + .get(diff_id.as_str()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); + + let mut splitstream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE); + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id, verity); + } + + splitstream.write_external(&raw_config)?; + let config_id = repo.write_stream(splitstream, &content_id, None)?; + + Ok((config_digest.to_string(), config_id, layer_refs, stats)) +} + +/// Import a single layer by streaming from a file handle. +async fn import_layer_from_file( + repo: &Arc>, + diff_id: &str, + layer_file: std::fs::File, + media_type: &MediaType, +) -> Result<(ObjectID, ImportStats)> { + let content_id = layer_identifier(diff_id); + + if let Some(layer_id) = repo.has_stream(&content_id)? { + debug!("Already have layer {diff_id}"); + return Ok((layer_id, ImportStats::default())); + } + + debug!("Importing layer {diff_id}"); + + // Convert std::fs::File to tokio::fs::File for async I/O + let async_file = tokio::fs::File::from_std(layer_file); + + let (object_id, layer_stats) = if is_tar_media_type(media_type) { + let reader = decompress_async(async_file, media_type)?; + import_tar_async(repo.clone(), reader).await? + } else { + // Non-tar blob: store as object and create splitstream wrapper + let (object_id, size, method) = store_blob_async(repo, async_file).await?; + + let mut stats = ImportStats::default(); + match method { + ObjectStoreMethod::Copied => { + stats.objects_copied += 1; + stats.bytes_copied += size; + } + ObjectStoreMethod::AlreadyPresent => { + stats.objects_already_present += 1; + } + } + + let mut stream = repo.create_stream(OCI_BLOB_CONTENT_TYPE); + stream.add_external_size(size); + stream.write_reference(object_id)?; + let stream_id = repo.write_stream(stream, &content_id, None)?; + return Ok((stream_id, stats)); + }; + + // Register the stream with its content identifier + repo.register_stream(&object_id, &content_id, None).await?; + + Ok((object_id, layer_stats)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_oci_layout_ref() { + let cases: &[(&str, (&str, Option<&str>))] = &[ + ("/path/to/oci", ("/path/to/oci", None)), + ("./local/oci", ("./local/oci", None)), + ("ocidir", ("ocidir", None)), + ("/path/to/oci:latest", ("/path/to/oci", Some("latest"))), + ("/path/to/oci:v1.0.0", ("/path/to/oci", Some("v1.0.0"))), + ("./local/oci:mytag", ("./local/oci", Some("mytag"))), + ("ocidir:latest", ("ocidir", Some("latest"))), + ("C:/path/to/oci", ("C:/path/to/oci", None)), + ("C:/path/to/oci:latest", ("C:/path/to/oci", Some("latest"))), + ( + "/path/to/oci:tag:with:colons", + ("/path/to/oci:tag:with", Some("colons")), + ), + ("/path/to/oci:", ("/path/to/oci", Some(""))), + ("ocidir:", ("ocidir", Some(""))), + ("/path:middle/to/oci", ("/path:middle/to/oci", None)), + ( + "/path:middle/to/oci:tag", + ("/path:middle/to/oci", Some("tag")), + ), + ]; + for (input, expected) in cases { + assert_eq!(parse_oci_layout_ref(input), *expected, "input: {input}"); + } + } + + #[tokio::test] + async fn test_empty_manifest_list_rejected() { + use composefs::fsverity::Sha256HashValue; + use oci_spec::image::{DescriptorBuilder, ImageIndexBuilder, OciLayoutBuilder}; + use sha2::Digest; + + let tempdir = tempfile::tempdir().unwrap(); + let layout_path = tempdir.path(); + + let oci_layout = OciLayoutBuilder::default() + .image_layout_version("1.0.0".to_string()) + .build() + .unwrap(); + let oci_layout_path = layout_path.join("oci-layout"); + std::fs::write(&oci_layout_path, oci_layout.to_string().unwrap()).unwrap(); + + let blobs_dir = layout_path.join("blobs/sha256"); + std::fs::create_dir_all(&blobs_dir).unwrap(); + + let manifest_list = ImageIndexBuilder::default() + .schema_version(2u32) + .media_type(MediaType::ImageIndex) + .manifests(vec![]) + .build() + .unwrap(); + let manifest_list_json = manifest_list.to_string().unwrap(); + let manifest_list_digest = format!( + "sha256:{}", + hex::encode(sha2::Sha256::digest(manifest_list_json.as_bytes())) + ); + let blob_path = blobs_dir.join(&manifest_list_digest[7..]); + std::fs::write(&blob_path, &manifest_list_json).unwrap(); + + let manifest_list_desc = DescriptorBuilder::default() + .media_type(MediaType::ImageIndex) + .digest( + manifest_list_digest + .parse::() + .unwrap(), + ) + .size(manifest_list_json.len() as u64) + .build() + .unwrap(); + + let top_index = ImageIndexBuilder::default() + .schema_version(2u32) + .media_type(MediaType::ImageIndex) + .manifests(vec![manifest_list_desc]) + .build() + .unwrap(); + let index_path = layout_path.join("index.json"); + std::fs::write(&index_path, top_index.to_string().unwrap()).unwrap(); + + let repo_dir = tempfile::tempdir().unwrap(); + let repo = std::sync::Arc::new( + composefs::repository::Repository::::open_path( + rustix::fs::CWD, + repo_dir.path(), + ) + .unwrap(), + ); + + let result = import_oci_layout(&repo, layout_path, None, None).await; + let err = result.expect_err("should fail with no matching platform"); + let err_msg = format!("{err:#}"); + assert!( + err_msg.contains("No manifest found for platform") + || err_msg.contains("no platform info"), + "unexpected error: {err_msg}" + ); + } +} diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index e0c44e35..e93f859f 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -13,30 +13,25 @@ use std::{cmp::Reverse, process::Command, thread::available_parallelism}; use std::{iter::zip, sync::Arc}; use anyhow::{Context, Result}; -use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder}; use containers_image_proxy::{ - ConvertedLayerInfo, ImageProxy, ImageProxyConfig, OpenedImage, Transport, + ConvertedLayerInfo, ImageProxy, ImageProxyConfig, ImageReference, OpenedImage, Transport, }; use fn_error_context::context; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use oci_spec::image::{Descriptor, ImageConfiguration, MediaType}; +use oci_spec::image::Descriptor; use rustix::process::geteuid; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt, BufReader}, - sync::Semaphore, - task::JoinSet, -}; +use tokio::{io::AsyncReadExt, sync::Semaphore, task::JoinSet}; use composefs::{ fsverity::FsVerityHashValue, repository::{ObjectStoreMethod, Repository}, - shared_internals::IO_BUF_CAPACITY, }; use crate::{ - config_identifier, layer_identifier, - oci_image::{is_tar_media_type, manifest_identifier, tag_image}, - tar::split_async, + config_identifier, + layer::{decompress_async, import_tar_async, is_tar_media_type, store_blob_async}, + layer_identifier, + oci_image::{manifest_identifier, tag_image}, ContentAndVerity, ImportStats, }; @@ -87,11 +82,10 @@ struct ImageOp { impl ImageOp { async fn new( repo: &Arc>, - imgref: &str, + image_ref: &ImageReference, img_proxy_config: Option, ) -> Result { - // Detect transport from image reference - let transport = Transport::try_from(imgref).context("Failed to get image transport")?; + let transport = image_ref.transport; // See https://github.com/containers/skopeo/issues/2563 let skopeo_cmd = if transport == Transport::ContainerStorage && !geteuid().is_root() { @@ -103,10 +97,22 @@ impl ImageOp { }; // See https://github.com/containers/skopeo/issues/2750 - let imgref = if let Some(hash) = imgref.strip_prefix("containers-storage:sha256:") { - &format!("containers-storage:{hash}") // yay temporary lifetime extension! + // ImageReference.name for containers-storage: is already without the + // transport prefix (e.g. "sha256:abc" not "containers-storage:sha256:abc"). + // Skopeo expects "abc" without the "sha256:" prefix for digest references. + let fixup_ref; + let image_ref = if transport == Transport::ContainerStorage { + if let Some(hash) = image_ref.name.strip_prefix("sha256:") { + fixup_ref = ImageReference { + transport, + name: hash.to_string(), + }; + &fixup_ref + } else { + image_ref + } } else { - imgref + image_ref }; let config = match img_proxy_config { @@ -130,7 +136,10 @@ impl ImageOp { let proxy = containers_image_proxy::ImageProxy::new_with_config(config) .await .context("Creating ImageProxy")?; - let img = proxy.open_image(imgref).await.context("Opening image")?; + let img = proxy + .open_image_ref(image_ref) + .await + .context("Opening image")?; let progress = MultiProgress::new(); Ok(ImageOp { repo: Arc::clone(repo), @@ -196,40 +205,12 @@ impl ImageOp { let media_type = descriptor.media_type(); let (object_id, layer_stats) = if is_tar_media_type(media_type) { // Tar layers: decompress and split into a splitstream - let reader: Box = match media_type { - MediaType::ImageLayer | MediaType::ImageLayerNonDistributable => { - Box::new(BufReader::with_capacity(IO_BUF_CAPACITY, progress)) - } - MediaType::ImageLayerGzip | MediaType::ImageLayerNonDistributableGzip => { - Box::new(BufReader::with_capacity( - IO_BUF_CAPACITY, - GzipDecoder::new(BufReader::new(progress)), - )) - } - MediaType::ImageLayerZstd | MediaType::ImageLayerNonDistributableZstd => { - Box::new(BufReader::with_capacity( - IO_BUF_CAPACITY, - ZstdDecoder::new(BufReader::new(progress)), - )) - } - _ => unreachable!("is_tar_media_type returned true"), - }; - split_async(reader, self.repo.clone(), TAR_LAYER_CONTENT_TYPE).await? + let reader = decompress_async(progress, media_type)?; + import_tar_async(self.repo.clone(), reader).await? } else { - // Non-tar layers (OCI artifacts like SBOMs, disk images, - // etc.): stream the raw bytes into a repository object and - // create a splitstream with a single external reference. - // This avoids buffering arbitrarily large blobs in memory - // and lets callers get an fd to the object directly via - // open_object(). - let tmpfile = self.repo.create_object_tmpfile()?; - let mut writer = tokio::fs::File::from(std::fs::File::from(tmpfile)); - let mut reader = progress; - let size = tokio::io::copy(&mut reader, &mut writer).await?; - writer.flush().await?; - let tmpfile = writer.into_std().await; + // Non-tar layers (OCI artifacts): stream raw bytes to object store + let (object_id, size, method) = store_blob_async(&self.repo, progress).await?; driver.await?; - let (object_id, method) = self.repo.finalize_object_tmpfile(tmpfile, size)?; let mut stats = ImportStats::default(); match method { @@ -242,11 +223,10 @@ impl ImageOp { } } + // Create splitstream with external reference and register it let mut stream = self.repo.create_stream(OCI_BLOB_CONTENT_TYPE); stream.add_external_size(size); stream.write_reference(object_id)?; - // write_stream handles both object storage and stream - // registration, so we return directly. let stream_id = self.repo.write_stream(stream, &content_id, None)?; return Ok((stream_id, stats)); }; @@ -265,37 +245,54 @@ impl ImageOp { } /// Ensure config is present and return layer verities along with config info. + /// + /// Returns (config_digest, config_verity, layer_refs, stats). + /// `layer_refs` is an ordered Vec of (diff_id, verity) pairs preserving the + /// order from the config (or manifest for artifacts). async fn ensure_config_with_layers( self: &Arc, manifest_layers: &[Descriptor], descriptor: &Descriptor, - ) -> Result<( - String, - ObjectID, - // FIXME change this string to be Digest - actually we may want to go stronger and have a - // struct DiffID(Digest) newtype - std::collections::HashMap, - ImportStats, - )> { + ) -> Result<(String, ObjectID, Vec<(String, ObjectID)>, ImportStats)> { let config_digest: &str = descriptor.digest().as_ref(); let content_id = config_identifier(config_digest); if let Some(config_id) = self.repo.has_stream(&content_id)? { - // We already got this config - need to read the layer refs from it + // We already got this config - need to read the layer refs and diff_ids from it self.progress .println(format!("Already have container config {config_digest}"))?; - let stream = self.repo.open_stream( + let (data, named_refs) = crate::oci_image::read_external_splitstream( + &self.repo, &content_id, Some(&config_id), Some(OCI_CONFIG_CONTENT_TYPE), )?; - let layer_refs: std::collections::HashMap = stream - .into_named_refs() - .into_iter() - .map(|(k, v)| (k.to_string(), v)) + let named_refs_map: std::collections::HashMap<&str, ObjectID> = named_refs + .iter() + .map(|(k, v)| (k.as_ref(), v.clone())) .collect(); + let diff_ids = + crate::extract_diff_ids(descriptor.media_type(), &data, manifest_layers)?; + + let layer_refs: Vec<(String, ObjectID)> = diff_ids + .into_iter() + .map(|diff_id| { + let verity = named_refs_map + .get(diff_id.as_str()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); + Ok(( config_digest.to_string(), config_id, @@ -322,16 +319,8 @@ impl ImageOp { // custom media type — not a standard image config. In that case // there are no diff_ids, so we use the manifest layer digests. // [1]: https://github.com/opencontainers/image-spec/blob/main/artifacts-guidance.md - let is_image_config = *descriptor.media_type() == MediaType::ImageConfig; - let diff_ids: Vec = if is_image_config { - let config = ImageConfiguration::from_reader(&raw_config[..])?; - config.rootfs().diff_ids().to_vec() - } else { - manifest_layers - .iter() - .map(|d| d.digest().to_string()) - .collect() - }; + let diff_ids = + crate::extract_diff_ids(descriptor.media_type(), &raw_config, manifest_layers)?; // Sort layers by size for parallel fetching let mut layers: Vec<_> = zip(manifest_layers, &diff_ids).collect(); @@ -370,21 +359,36 @@ impl ImageOp { }); } - // Collect results and sort by index for deterministic ordering - let mut results: Vec<_> = layer_tasks - .join_all() - .await + // Collect results into a map keyed by diff_id for ordered lookup + let mut verity_map = std::collections::HashMap::new(); + let mut stats = ImportStats::default(); + for result in layer_tasks.join_all().await { + let (_, diff_id, verity, layer_stats) = result?; + verity_map.insert(diff_id, verity); + stats.merge(&layer_stats); + } + + // Build ordered layer_refs from config-defined diff_id order + let layer_refs: Vec<(String, ObjectID)> = diff_ids .into_iter() - .collect::>()?; - results.sort_by_key(|(idx, _, _, _)| *idx); + .map(|diff_id| { + let verity = verity_map + .get(diff_id.as_str()) + .with_context(|| format!("missing layer verity for diff_id {diff_id}"))?; + Ok((diff_id, verity.clone())) + }) + .collect::>()?; + + anyhow::ensure!( + layer_refs.len() == manifest_layers.len(), + "expected {} layer refs but got {}", + manifest_layers.len(), + layer_refs.len() + ); let mut splitstream = self.repo.create_stream(OCI_CONFIG_CONTENT_TYPE); - let mut layer_refs = std::collections::HashMap::new(); - let mut stats = ImportStats::default(); - for (_, diff_id, verity, layer_stats) in results { - splitstream.add_named_stream_ref(&diff_id, &verity); - layer_refs.insert(diff_id, verity); - stats.merge(&layer_stats); + for (diff_id, verity) in &layer_refs { + splitstream.add_named_stream_ref(diff_id, verity); } // Store config as external object for independent fsverity @@ -405,7 +409,7 @@ impl ImageOp { let manifest = oci_spec::image::ImageManifest::from_reader(raw_manifest.as_slice())?; let config_descriptor = manifest.config(); let layers = manifest.layers(); - let (config_digest, config_verity, layer_verities, stats) = self + let (config_digest, config_verity, layer_refs, stats) = self .ensure_config_with_layers(layers, config_descriptor) .await .with_context(|| format!("Failed to pull config {config_descriptor:?}"))?; @@ -424,7 +428,8 @@ impl ImageOp { let config_key = format!("config:{}", config_descriptor.digest()); splitstream.add_named_stream_ref(&config_key, &config_verity); - for (diff_id, verity) in &layer_verities { + // Add layer refs in config-defined diff_id order + for (diff_id, verity) in &layer_refs { splitstream.add_named_stream_ref(diff_id, verity); } @@ -451,6 +456,9 @@ impl ImageOp { /// Returns `PullResult` containing both manifest and config digests/verities. /// If `reference` is provided, the manifest is also stored under that name. /// +/// For `oci:` transport (local OCI layout directories), this uses a fast path +/// that reads the layout directly without going through the skopeo proxy. +/// /// Note: For backward compatibility, use `.into_config()` on the result to get /// the (config_digest, config_verity) tuple that was previously returned. pub async fn pull_image( @@ -459,7 +467,19 @@ pub async fn pull_image( reference: Option<&str>, img_proxy_config: Option, ) -> Result<(PullResult, ImportStats)> { - let op = Arc::new(ImageOp::new(repo, imgref, img_proxy_config).await?); + let image_ref = + ImageReference::try_from(imgref).context("Parsing image reference transport")?; + + // Fast path: read local OCI layout directories directly without skopeo + if image_ref.transport == Transport::OciDir { + let (path_str, layout_tag) = crate::oci_layout::parse_oci_layout_ref(&image_ref.name); + let layout_path = std::path::Path::new(path_str); + return crate::oci_layout::import_oci_layout(repo, layout_path, layout_tag, reference) + .await; + } + + // Standard path: use skopeo proxy for other transports + let op = Arc::new(ImageOp::new(repo, &image_ref, img_proxy_config).await?); let (result, stats) = op .pull() .await diff --git a/crates/integration-tests/Cargo.toml b/crates/integration-tests/Cargo.toml index 476a096c..f15f6b8a 100644 --- a/crates/integration-tests/Cargo.toml +++ b/crates/integration-tests/Cargo.toml @@ -14,11 +14,11 @@ path = "src/main.rs" [dependencies] anyhow = "1" -cap-std-ext = "4.0" +cap-std-ext = { workspace = true } composefs = { workspace = true } libtest-mimic = "0.8" linkme = "0.3" -ocidir = "0.6" +ocidir = { workspace = true } paste = "1" rustix = { version = "1.0.0", default-features = false, features = ["process"] } serde_json = "1.0"