Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions api/dataset/v1alpha1/dataset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ type DatasetSource struct {
// +kubebuilder:pruning:PreserveUnknownFields
// options is a map of key-value pairs that can be used to specify additional options for the dataset source, e.g. {"branch": "master"}
// supported keys for each type of dataset source are:
// - GIT: branch, commit, depth, submodules
// - S3: region, endpoint, provider
// - HTTP: any key-value pair will be passed to the underlying http client as http headers
// - GIT: branch, commit, depth, submodules, bandwidthLimit
// - S3: region, endpoint, provider, bandwidthLimit
// - HTTP: bandwidthLimit, any other key-value pair will be passed to the underlying http client as http headers
// - PVC:
// - NFS:
// - CONDA: requirements.txt, environment.yaml
// - CONDA: requirements.txt, environment.yaml, bandwidthLimit
// - REFERENCE:
// - HUGGING_FACE: repo, repoType, endpoint, include, exclude, revision
// - MODEL_SCOPE: repo, repoType, include, exclude, revision
// - HUGGING_FACE: repo, repoType, endpoint, include, exclude, revision, bandwidthLimit
// - MODEL_SCOPE: repo, repoType, include, exclude, revision, bandwidthLimit
// bandwidthLimit: Bandwidth limit for downloads in KiB/s, or use suffix B|K|M|G|T|P (e.g. "1M" for 1 MiB/s, "10M" for 10 MiB/s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any solutions for other types of Datasets?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above - bandwidth limiting can be extended to other dataset types beyond HTTP and S3, though implementation complexity varies. The current approach leverages rclone's built-in --bwlimit flag, which is why it's straightforward for HTTP and S3. Other types would require external bandwidth limiting tools like trickle or traffic control.

Options map[string]string `json:"options,omitempty"`
}

Expand Down
5 changes: 3 additions & 2 deletions config/crd/bases/dataset.baizeai.io_datasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,15 @@ spec:
options is a map of key-value pairs that can be used to specify additional options for the dataset source, e.g. {"branch": "master"}
supported keys for each type of dataset source are:
- GIT: branch, commit, depth, submodules
- S3: region, endpoint, provider
- HTTP: any key-value pair will be passed to the underlying http client as http headers
- S3: region, endpoint, provider, bandwidthLimit
- HTTP: bandwidthLimit, any other key-value pair will be passed to the underlying http client as http headers
- PVC:
- NFS:
- CONDA: requirements.txt, environment.yaml
- REFERENCE:
- HUGGING_FACE: repo, repoType, endpoint, include, exclude, revision
- MODEL_SCOPE: repo, repoType, include, exclude, revision
bandwidthLimit: Bandwidth limit for downloads in KiB/s, or use suffix B|K|M|G|T|P (e.g. "1M" for 1 MiB/s, "10M" for 10 MiB/s)
type: object
x-kubernetes-preserve-unknown-fields: true
type:
Expand Down
20 changes: 20 additions & 0 deletions config/samples/git-dataset-with-bandwidth-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dataset.baizeai.io/v1alpha1
kind: Dataset
metadata:
name: git-dataset-with-bandwidth-limit
namespace: default
spec:
source:
type: GIT
uri: https://github.com/kubernetes/kubernetes.git
options:
branch: "master"
depth: "1"
bandwidthLimit: "5M" # Limit to 5 MiB/s
volumeClaimTemplate:
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
16 changes: 16 additions & 0 deletions config/samples/http-dataset-with-bandwidth-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apiVersion: dataset.baizeai.io/v1alpha1
kind: Dataset
metadata:
name: gpt2-train-data-limited
spec:
dataSyncRound: 1
mountOptions:
gid: 1000
mode: "0774"
path: /
uid: 1000
source:
type: HTTP
uri: http://baize-ai.daocloud.io/gpt2-train-data/
options:
bandwidthLimit: "10M" # Limit download bandwidth to 10 MiB/s
20 changes: 20 additions & 0 deletions config/samples/huggingface-dataset-with-bandwidth-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dataset.baizeai.io/v1alpha1
kind: Dataset
metadata:
name: huggingface-dataset-with-bandwidth-limit
namespace: default
spec:
source:
type: HUGGING_FACE
uri: huggingface://microsoft/DialoGPT-medium
options:
repoType: "model"
bandwidthLimit: "3M" # Limit to 3 MiB/s
secretRef: huggingface-credentials
volumeClaimTemplate:
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
20 changes: 20 additions & 0 deletions config/samples/modelscope-dataset-with-bandwidth-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: dataset.baizeai.io/v1alpha1
kind: Dataset
metadata:
name: modelscope-dataset-with-bandwidth-limit
namespace: default
spec:
source:
type: MODEL_SCOPE
uri: modelscope://damo/nlp_structbert_backbone_base_std
options:
repoType: "model"
bandwidthLimit: "8M" # Limit to 8 MiB/s
secretRef: modelscope-credentials
volumeClaimTemplate:
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
19 changes: 19 additions & 0 deletions config/samples/s3-dataset-with-bandwidth-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
apiVersion: dataset.baizeai.io/v1alpha1
kind: Dataset
metadata:
name: s3-dataset-limited
spec:
dataSyncRound: 1
mountOptions:
gid: 1000
mode: "0774"
path: /
uid: 1000
source:
type: S3
uri: s3://my-bucket/models/
options:
region: "us-east-1"
provider: "AWS"
bandwidthLimit: "5M" # Limit download bandwidth to 5 MiB/s
secretRef: s3-credentials
Binary file added data-loader
Binary file not shown.
6 changes: 4 additions & 2 deletions data-loader.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ RUN pip install --no-cache-dir "huggingface_hub[cli]"==0.33.1 modelscope==1.27.1
arch=$(uname -m | sed -E 's/x86_64/amd64/g;s/aarch64/arm64/g') && \
filename=rclone-${rclone_version}-linux-${arch} && \
wget https://github.com/rclone/rclone/releases/download/${rclone_version}/${filename}.zip -O ${filename}.zip && \
unzip ${filename}.zip && mv ${filename}/rclone /usr/local/bin && rm -rf ${filename} ${filename}.zip
unzip ${filename}.zip && mv ${filename}/rclone /usr/local/bin && rm -rf ${filename} ${filename}.zip && \
apt-get update && apt-get install -y trickle && rm -rf /var/lib/apt/lists/*


COPY --from=builder /workspace/data-loader /usr/local/bin/
COPY entrypoint.sh /usr/local/bin/

ENTRYPOINT ["/usr/local/bin/data-loader"]
ENTRYPOINT ["/usr/local/bin/entrypoint.sh"]
90 changes: 90 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#!/bin/bash
set -e

# Function to convert bandwidth limit from rclone format to KB/s for trickle
convert_bandwidth_limit() {
local limit="$1"

if [[ -z "$limit" ]]; then
echo "0"
return
fi

# Extract number and suffix using regex
if [[ "$limit" =~ ^([0-9]+(\.[0-9]+)?)([BKMGTP]?)$ ]]; then
local number="${BASH_REMATCH[1]}"
local suffix="${BASH_REMATCH[3]^^}" # Convert to uppercase

# Convert to integer for bash arithmetic (handle decimal by multiplying by 1000 first)
local number_int
if [[ "$number" =~ \. ]]; then
# For decimal numbers, multiply by 1000 first then divide later
number_int=$(awk "BEGIN {print int($number * 1000)}")
local decimal_multiplier=1000
else
number_int=$number
local decimal_multiplier=1
fi

local bytes_per_second
case "$suffix" in
"B")
bytes_per_second=$((number_int / decimal_multiplier))
;;
""|"K")
# Plain number defaults to KiB/s for rclone compatibility
bytes_per_second=$((number_int * 1024 / decimal_multiplier))
;;
"M")
bytes_per_second=$((number_int * 1024 * 1024 / decimal_multiplier))
;;
"G")
bytes_per_second=$((number_int * 1024 * 1024 * 1024 / decimal_multiplier))
;;
"T")
bytes_per_second=$((number_int * 1024 * 1024 * 1024 * 1024 / decimal_multiplier))
;;
"P")
bytes_per_second=$((number_int * 1024 * 1024 * 1024 * 1024 * 1024 / decimal_multiplier))
;;
*)
echo "Error: Unsupported suffix: $suffix" >&2
echo "0"
return
;;
esac

# Convert bytes per second to KB/s (1 KB = 1000 bytes for trickle)
local kbps=$((bytes_per_second / 1000))

# Minimum 1 KB/s if > 0
if [[ $bytes_per_second -gt 0 && $kbps -eq 0 ]]; then
kbps=1
fi

echo "$kbps"
else
echo "Error: Invalid bandwidth limit format: $limit" >&2
echo "0"
fi
}

# Check if bandwidth limit is set via environment variable
if [[ -n "$BANDWIDTH_LIMIT" ]]; then
echo "Bandwidth limit detected: $BANDWIDTH_LIMIT"

# Convert bandwidth limit to trickle format
kbps=$(convert_bandwidth_limit "$BANDWIDTH_LIMIT")

if [[ "$kbps" -gt 0 ]]; then
echo "Applying bandwidth limit: ${kbps} KB/s"
# Use trickle to wrap the data-loader command
exec trickle -d "$kbps" -u "$kbps" /usr/local/bin/data-loader "$@"
else
echo "Invalid or zero bandwidth limit, running without trickle"
exec /usr/local/bin/data-loader "$@"
fi
else
# No bandwidth limit, run data-loader directly
exec /usr/local/bin/data-loader "$@"
fi
15 changes: 14 additions & 1 deletion internal/controller/dataset/dataset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,13 @@ func (r *DatasetReconciler) reconcileJob(ctx context.Context, ds *datasetv1alpha
}

options := make(map[string]string)
var bandwidthLimit string
for k, v := range ds.Spec.Source.Options {
options[k] = v
if k == "bandwidthLimit" {
bandwidthLimit = v
} else {
options[k] = v
}
}

podSpec := &jobSpec.Template.Spec
Expand Down Expand Up @@ -698,6 +703,14 @@ func (r *DatasetReconciler) reconcileJob(ctx context.Context, ds *datasetv1alpha

container.Args = args

// Set bandwidth limit as environment variable if specified
if bandwidthLimit != "" {
container.Env = append(container.Env, corev1.EnvVar{
Name: "BANDWIDTH_LIMIT",
Value: bandwidthLimit,
})
}

// 最终创建 Job
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/datasources/datasource_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func (d *HTTPLoader) Sync(fromURI string, toPath string) error {
}

args = append(args, "-vvv")

cmd := exec.Command("rclone", args...)
cmd.Dir = d.Options.Root

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/datasources/datasource_huggingface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ func TestHuggingFaceLoader(t *testing.T) {
assert.Equal(t, string(bbs[2]), "whoami\n")
assert.Equal(t, string(bbs[3]), strings.Join([]string{"download", "ns/model", "--local-dir", huggingFaceDir, "--resume-download"}, " ")+"\n")
}

1 change: 1 addition & 0 deletions internal/pkg/datasources/datasource_modelscope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,4 @@ func TestModelScopeLoader(t *testing.T) {
assert.Equal(t, string(bbs[0]), "login --token test-token\n")
assert.Equal(t, string(bbs[1]), strings.Join([]string{"download", "ns/model", "--local_dir", modelScopeDir}, " ")+"\n")
}

1 change: 1 addition & 0 deletions internal/pkg/datasources/datasource_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (d *S3Loader) Sync(fromURI string, toPath string) error {
}

args = append(args, "-vvv")

cmd := exec.Command("rclone", args...)
cmd.Dir = d.Options.Root

Expand Down
1 change: 1 addition & 0 deletions internal/pkg/datasources/datasource_s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,4 @@ func TestS3Loader(t *testing.T) {
assert.True(t, strings.HasPrefix(string(bbs[1]), "config create"))
assert.True(t, strings.HasPrefix(string(bbs[2]), "sync"))
}

Loading