Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Among the distinguishing factors:
- While casync supports very small min chunk sizes, optimizations in desync require min chunk sizes larger than the window size of the rolling hash used (currently 48 bytes). The tool's default chunk sizes match the defaults used in casync, min 16k, avg 64k, max 256k.
- Allows FUSE mounting of blob indexes
- S3/GC protocol support to access chunk stores for read operations and some some commands that write chunks
- OCI Registries for chunk storage using [ORAS](https://oras.land/docs/)
- Stores and retrieves index files from remote index stores such as HTTP, SFTP, Google Storage and S3
- Built-in HTTP(S) index server to read/write indexes
- Reflinking matching blocks (rather than copying) from seed files if supported by the filesystem (currently only Btrfs and XFS)
Expand Down Expand Up @@ -221,6 +222,14 @@ s3+https://s3.internal.company/bucket/prefix?lookup=dns
s3+https://example.com/bucket/prefix?lookup=auto
```

### OCI Registries as chunk stores

OCI Registries can be used to store chunks. Use the `oci+https` schema when pointing at OCI stores. If the store does not support TLS, use `oci+http` instead.

```text
oci+https://ghcr.io/myrepo
```

### Compressed vs Uncompressed chunk stores

By default, desync reads and writes chunks in compressed form to all supported stores. This is in line with upstream casync's goal of storing in the most efficient way. It is however possible to change this behavior by providing desync with a config file (see Configuration section below). Disabling compression and store chunks uncompressed may reduce latency in some use-cases and improve performance. desync supports reading and writing uncompressed chunks to SFTP, S3, HTTP and local stores and caches. If more than one store is used, each of those can be configured independently, for example it's possible to read compressed chunks from S3 while using a local uncompressed cache for best performance. However, care needs to be taken when using the `chunk-server` command and building chains of chunk store proxies to avoid shifting the decompression load onto the server (it's possible this is actually desirable).
Expand All @@ -242,6 +251,7 @@ For most use cases, it is sufficient to use the tool's default configuration not
Available configuration values:

- `s3-credentials` - Defines credentials for use with S3 stores. Especially useful if more than one S3 store is used. The key in the config needs to be the URL scheme and host used for the store, excluding the path, but including the port number if used in the store URL. The key can also contain glob patterns, and the available wildcards are `*`, `?` and `[…]`. Please refer to the [filepath.Match](https://pkg.go.dev/path/filepath#Match) documentation for additional information. It is also possible to use a [standard aws credentials file](https://docs.aws.amazon.com/cli/latest/userguide/cli-config-files.html) in order to store s3 credentials.
- `oci-credentials` - Defines credentials for use with Open Container Registry stores.
- `store-options` - Allows customization of chunk and index stores, for example compression settings, timeouts, retry behavior and keys. Not all options are applicable to every store, some of these like `timeout` are ignored for local stores. Some of these options, such as the client certificates are overwritten with any values set in the command line. Note that the store location used in the command line needs to match the key under `store-options` exactly for these options to be used. As for the `s3-credentials`, glob patterns are also supported. A configuration file where more than one key matches a single store location, is considered invalid.
- `timeout` - Time limit for chunk read or write operation in nanoseconds. Default: 1 minute. If set to a negative value, timeout is infinite.
- `error-retry` - Number of times to retry failed chunk requests. Default: 0.
Expand Down Expand Up @@ -277,6 +287,12 @@ Available configuration values:
"aws-profile": "profile_refreshable"
}
},
"oci-credentials": {
"ghcr.io/myuser/repo": {
"username": "myuser",
"secret": "MYSECRET"
},
},
"store-options": {
"https://192.168.1.1/store": {
"client-cert": "/path/to/crt",
Expand Down
30 changes: 28 additions & 2 deletions cmd/desync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"oras.land/oras-go/v2/registry/remote/auth"
)

// S3Creds holds credentials or references to an S3 credentials file.
Expand All @@ -32,8 +33,18 @@ type S3Creds struct {
// Config is used to hold the global tool configuration. It's used to customize
// store features and provide credentials where needed.
type Config struct {
S3Credentials map[string]S3Creds `json:"s3-credentials"`
StoreOptions map[string]desync.StoreOptions `json:"store-options"`
S3Credentials map[string]S3Creds `json:"s3-credentials"`
OCICredentials map[string]OCICreds `json:"oci-credentials"`
StoreOptions map[string]desync.StoreOptions `json:"store-options"`
}

// OCICreds holds OCI credentials for a container registry store.
type OCICreds struct {
// Username for OCI store authentication.
Username string `json:"username,omitempty"`

// Secret (password or token) for OCI store authentication.
Secret string `json:"secret,omitempty"`
}

// GetS3CredentialsFor attempts to find creds and region for an S3 location in the
Expand Down Expand Up @@ -73,6 +84,21 @@ func (c Config) GetS3CredentialsFor(u *url.URL) (*credentials.Credentials, strin
return creds, region
}

// GetOCICredentialsFor attempts to find creds and region for an OCI location in the config.
func (c Config) GetOCICredentialsFor(u *url.URL) auth.CredentialFunc {
key := u.Host + u.Path
credsConfig, ok := c.OCICredentials[key]
if !ok {
return nil
}
return func(ctx context.Context, hostport string) (auth.Credential, error) {
return auth.Credential{
Username: credsConfig.Username,
Password: credsConfig.Secret,
}, nil
}
}

// GetStoreOptionsFor returns optional config options for a specific store. Note that
// an error will be returned if the location string matches multiple entries in the
// config file.
Expand Down
6 changes: 6 additions & 0 deletions cmd/desync/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func storeFromLocation(location string, cmdOpt cmdStoreOptions) (desync.Store, e
if err != nil {
return nil, err
}
case "oci+https", "oci+http":
creds := cfg.GetOCICredentialsFor(loc)
s, err = desync.NewOCIStore(loc, creds, opt)
if err != nil {
return nil, err
}
default:
local, err := desync.NewLocalStore(location, opt)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ require (
gopkg.in/cheggaaa/pb.v1 v1.0.28
)

require (
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.0
oras.land/oras-go/v2 v2.5.0
)

require (
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJ
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.13.5 h1:a3RLUqkyjYRtBTZJZ1VRrKbN3zhuPLlUc3sphVz81go=
Expand Down Expand Up @@ -257,3 +261,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
oras.land/oras-go/v2 v2.5.0 h1:o8Me9kLY74Vp5uw07QXPiitjsw7qNXi8Twd+19Zf02c=
oras.land/oras-go/v2 v2.5.0/go.mod h1:z4eisnLP530vwIOUOJeBIj0aGI0L1C3d53atvCBqZHg=
136 changes: 136 additions & 0 deletions oci.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package desync

import (
"bytes"
"context"
"crypto"
"crypto/tls"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"oras.land/oras-go/v2/errdef"
"oras.land/oras-go/v2/registry/remote"
"oras.land/oras-go/v2/registry/remote/auth"
"oras.land/oras-go/v2/registry/remote/retry"
)

var _ WriteStore = OCIStore{}

// OCIStore operates on chunks in an Open Container Image registry.
type OCIStore struct {
repo *remote.Repository
location string
opt StoreOptions
converters Converters
}

// NewOCIStore initializes a new Open Registry As Storage backend.
func NewOCIStore(u *url.URL, creds auth.CredentialFunc, opt StoreOptions) (OCIStore, error) {
// The OCI spec does not support desync's default hash algorithm (SHA512/256), so we must
// be using SHA256 only.
if Digest.Algorithm() != crypto.SHA256 {
return OCIStore{}, errors.New("OCI stores only support SHA256, use --digest=sha256")
}

repo, err := remote.NewRepository(u.Host + u.Path)
if err != nil {
return OCIStore{}, fmt.Errorf("failed to initialize oci registry store: %w", err)
}
baseTransport := http.DefaultTransport.(*http.Transport).Clone()
baseTransport.TLSClientConfig = &tls.Config{
InsecureSkipVerify: opt.TrustInsecure,
}
client := &auth.Client{
Client: &http.Client{
Transport: retry.NewTransport(baseTransport),
},
Credential: creds,
}
client.SetUserAgent("desync")
repo.Client = client
repo.PlainHTTP = strings.HasSuffix(u.Scheme, "-http")
s := OCIStore{
repo: repo,
location: u.String(),
opt: opt,
}
return s, nil
}

func (s OCIStore) String() string {
return s.location
}

// Close the store. NOP operation but needed to implement the store interface.
func (s OCIStore) Close() error { return nil }

// GetChunk reads and returns one chunk from the store
func (s OCIStore) GetChunk(id ChunkID) (*Chunk, error) {
descriptor, err := s.repo.Blobs().Resolve(context.Background(), ociReference(id))
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
return nil, ChunkMissing{id}
}
return nil, err
}
r, err := s.repo.Fetch(context.Background(), descriptor)
if err != nil {
if errors.Is(err, errdef.ErrNotFound) {
return nil, ChunkMissing{id}
}
return nil, err
}
defer r.Close()
b, err := io.ReadAll(r)
if err != nil {
return nil, err
}
return NewChunkFromStorage(id, b, s.converters, s.opt.SkipVerify)
}

// StoreChunk adds a new chunk to the store.
func (s OCIStore) StoreChunk(chunk *Chunk) error {
b, err := chunk.Data()
if err != nil {
return err
}
b, err = s.converters.toStorage(b)
if err != nil {
return err
}
descriptor := ociDescriptorForChunk(chunk.ID())
descriptor.Size = int64(len(b))
return s.repo.Push(context.Background(), descriptor, bytes.NewReader(b))
}

// HasChunk returns true if the chunk is in the store.
func (s OCIStore) HasChunk(id ChunkID) (bool, error) {
return s.repo.Exists(context.Background(), ociDescriptorForChunk(id))
}

// RemoveChunk deletes a chunk, typically an invalid one, from the store.
// Used when verifying and repairing caches.
func (s OCIStore) RemoveChunk(id ChunkID) error {
err := s.repo.Delete(context.Background(), ociDescriptorForChunk(id))
if errors.Is(err, errdef.ErrNotFound) {
return ChunkMissing{id}
}
return err
}

func ociDescriptorForChunk(id ChunkID) ocispec.Descriptor {
return ocispec.Descriptor{
Digest: digest.Digest(ociReference(id)),
MediaType: "application/vnd.oci.image.layer.v1.tar+zstd",
}
}

func ociReference(id ChunkID) string {
return "sha256:" + id.String()
}