diff --git a/api/dataset/v1alpha1/dataset_types.go b/api/dataset/v1alpha1/dataset_types.go index 6ff010a..941a53b 100644 --- a/api/dataset/v1alpha1/dataset_types.go +++ b/api/dataset/v1alpha1/dataset_types.go @@ -70,15 +70,16 @@ type DatasetSource struct { // +kubebuilder:pruning:PreserveUnknownFields // options is a map of key-value pairs that can be used to specify additional options for the dataset source, e.g. {"branch": "master"} // supported keys for each type of dataset source are: - // - GIT: branch, commit, depth, submodules - // - S3: region, endpoint, provider - // - HTTP: any key-value pair will be passed to the underlying http client as http headers + // - GIT: branch, commit, depth, submodules, bandwidthLimit + // - S3: region, endpoint, provider, bandwidthLimit + // - HTTP: bandwidthLimit, any other key-value pair will be passed to the underlying http client as http headers // - PVC: // - NFS: - // - CONDA: requirements.txt, environment.yaml + // - CONDA: requirements.txt, environment.yaml, bandwidthLimit // - REFERENCE: - // - HUGGING_FACE: repo, repoType, endpoint, include, exclude, revision - // - MODEL_SCOPE: repo, repoType, include, exclude, revision + // - HUGGING_FACE: repo, repoType, endpoint, include, exclude, revision, bandwidthLimit + // - MODEL_SCOPE: repo, repoType, include, exclude, revision, bandwidthLimit + // bandwidthLimit: Bandwidth limit for downloads in KiB/s, or use suffix B|K|M|G|T|P (e.g. "1M" for 1 MiB/s, "10M" for 10 MiB/s) Options map[string]string `json:"options,omitempty"` } diff --git a/config/crd/bases/dataset.baizeai.io_datasets.yaml b/config/crd/bases/dataset.baizeai.io_datasets.yaml index 7c20b10..5513596 100644 --- a/config/crd/bases/dataset.baizeai.io_datasets.yaml +++ b/config/crd/bases/dataset.baizeai.io_datasets.yaml @@ -157,14 +157,15 @@ spec: options is a map of key-value pairs that can be used to specify additional options for the dataset source, e.g. {"branch": "master"} supported keys for each type of dataset source are: - GIT: branch, commit, depth, submodules - - S3: region, endpoint, provider - - HTTP: any key-value pair will be passed to the underlying http client as http headers + - S3: region, endpoint, provider, bandwidthLimit + - HTTP: bandwidthLimit, any other key-value pair will be passed to the underlying http client as http headers - PVC: - NFS: - CONDA: requirements.txt, environment.yaml - REFERENCE: - HUGGING_FACE: repo, repoType, endpoint, include, exclude, revision - MODEL_SCOPE: repo, repoType, include, exclude, revision + bandwidthLimit: Bandwidth limit for downloads in KiB/s, or use suffix B|K|M|G|T|P (e.g. "1M" for 1 MiB/s, "10M" for 10 MiB/s) type: object x-kubernetes-preserve-unknown-fields: true type: diff --git a/config/samples/git-dataset-with-bandwidth-limit.yaml b/config/samples/git-dataset-with-bandwidth-limit.yaml new file mode 100644 index 0000000..136391e --- /dev/null +++ b/config/samples/git-dataset-with-bandwidth-limit.yaml @@ -0,0 +1,20 @@ +apiVersion: dataset.baizeai.io/v1alpha1 +kind: Dataset +metadata: + name: git-dataset-with-bandwidth-limit + namespace: default +spec: + source: + type: GIT + uri: https://github.com/kubernetes/kubernetes.git + options: + branch: "master" + depth: "1" + bandwidthLimit: "5M" # Limit to 5 MiB/s + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi \ No newline at end of file diff --git a/config/samples/http-dataset-with-bandwidth-limit.yaml b/config/samples/http-dataset-with-bandwidth-limit.yaml new file mode 100644 index 0000000..f4f5155 --- /dev/null +++ b/config/samples/http-dataset-with-bandwidth-limit.yaml @@ -0,0 +1,16 @@ +apiVersion: dataset.baizeai.io/v1alpha1 +kind: Dataset +metadata: + name: gpt2-train-data-limited +spec: + dataSyncRound: 1 + mountOptions: + gid: 1000 + mode: "0774" + path: / + uid: 1000 + source: + type: HTTP + uri: http://baize-ai.daocloud.io/gpt2-train-data/ + options: + bandwidthLimit: "10M" # Limit download bandwidth to 10 MiB/s diff --git a/config/samples/huggingface-dataset-with-bandwidth-limit.yaml b/config/samples/huggingface-dataset-with-bandwidth-limit.yaml new file mode 100644 index 0000000..5fbdf6a --- /dev/null +++ b/config/samples/huggingface-dataset-with-bandwidth-limit.yaml @@ -0,0 +1,20 @@ +apiVersion: dataset.baizeai.io/v1alpha1 +kind: Dataset +metadata: + name: huggingface-dataset-with-bandwidth-limit + namespace: default +spec: + source: + type: HUGGING_FACE + uri: huggingface://microsoft/DialoGPT-medium + options: + repoType: "model" + bandwidthLimit: "3M" # Limit to 3 MiB/s + secretRef: huggingface-credentials + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi \ No newline at end of file diff --git a/config/samples/modelscope-dataset-with-bandwidth-limit.yaml b/config/samples/modelscope-dataset-with-bandwidth-limit.yaml new file mode 100644 index 0000000..51b5db3 --- /dev/null +++ b/config/samples/modelscope-dataset-with-bandwidth-limit.yaml @@ -0,0 +1,20 @@ +apiVersion: dataset.baizeai.io/v1alpha1 +kind: Dataset +metadata: + name: modelscope-dataset-with-bandwidth-limit + namespace: default +spec: + source: + type: MODEL_SCOPE + uri: modelscope://damo/nlp_structbert_backbone_base_std + options: + repoType: "model" + bandwidthLimit: "8M" # Limit to 8 MiB/s + secretRef: modelscope-credentials + volumeClaimTemplate: + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 2Gi \ No newline at end of file diff --git a/config/samples/s3-dataset-with-bandwidth-limit.yaml b/config/samples/s3-dataset-with-bandwidth-limit.yaml new file mode 100644 index 0000000..f223389 --- /dev/null +++ b/config/samples/s3-dataset-with-bandwidth-limit.yaml @@ -0,0 +1,19 @@ +apiVersion: dataset.baizeai.io/v1alpha1 +kind: Dataset +metadata: + name: s3-dataset-limited +spec: + dataSyncRound: 1 + mountOptions: + gid: 1000 + mode: "0774" + path: / + uid: 1000 + source: + type: S3 + uri: s3://my-bucket/models/ + options: + region: "us-east-1" + provider: "AWS" + bandwidthLimit: "5M" # Limit download bandwidth to 5 MiB/s + secretRef: s3-credentials diff --git a/data-loader b/data-loader new file mode 100755 index 0000000..4b152e1 Binary files /dev/null and b/data-loader differ diff --git a/data-loader.Dockerfile b/data-loader.Dockerfile index fac78fe..fcdf7c3 100644 --- a/data-loader.Dockerfile +++ b/data-loader.Dockerfile @@ -21,9 +21,11 @@ RUN pip install --no-cache-dir "huggingface_hub[cli]"==0.33.1 modelscope==1.27.1 arch=$(uname -m | sed -E 's/x86_64/amd64/g;s/aarch64/arm64/g') && \ filename=rclone-${rclone_version}-linux-${arch} && \ wget https://github.com/rclone/rclone/releases/download/${rclone_version}/${filename}.zip -O ${filename}.zip && \ - unzip ${filename}.zip && mv ${filename}/rclone /usr/local/bin && rm -rf ${filename} ${filename}.zip + unzip ${filename}.zip && mv ${filename}/rclone /usr/local/bin && rm -rf ${filename} ${filename}.zip && \ + apt-get update && apt-get install -y trickle && rm -rf /var/lib/apt/lists/* COPY --from=builder /workspace/data-loader /usr/local/bin/ +COPY entrypoint.sh /usr/local/bin/ -ENTRYPOINT ["/usr/local/bin/data-loader"] +ENTRYPOINT ["/usr/local/bin/entrypoint.sh"] diff --git a/entrypoint.sh b/entrypoint.sh new file mode 100755 index 0000000..c122f24 --- /dev/null +++ b/entrypoint.sh @@ -0,0 +1,90 @@ +#!/bin/bash +set -e + +# Function to convert bandwidth limit from rclone format to KB/s for trickle +convert_bandwidth_limit() { + local limit="$1" + + if [[ -z "$limit" ]]; then + echo "0" + return + fi + + # Extract number and suffix using regex + if [[ "$limit" =~ ^([0-9]+(\.[0-9]+)?)([BKMGTP]?)$ ]]; then + local number="${BASH_REMATCH[1]}" + local suffix="${BASH_REMATCH[3]^^}" # Convert to uppercase + + # Convert to integer for bash arithmetic (handle decimal by multiplying by 1000 first) + local number_int + if [[ "$number" =~ \. ]]; then + # For decimal numbers, multiply by 1000 first then divide later + number_int=$(awk "BEGIN {print int($number * 1000)}") + local decimal_multiplier=1000 + else + number_int=$number + local decimal_multiplier=1 + fi + + local bytes_per_second + case "$suffix" in + "B") + bytes_per_second=$((number_int / decimal_multiplier)) + ;; + ""|"K") + # Plain number defaults to KiB/s for rclone compatibility + bytes_per_second=$((number_int * 1024 / decimal_multiplier)) + ;; + "M") + bytes_per_second=$((number_int * 1024 * 1024 / decimal_multiplier)) + ;; + "G") + bytes_per_second=$((number_int * 1024 * 1024 * 1024 / decimal_multiplier)) + ;; + "T") + bytes_per_second=$((number_int * 1024 * 1024 * 1024 * 1024 / decimal_multiplier)) + ;; + "P") + bytes_per_second=$((number_int * 1024 * 1024 * 1024 * 1024 * 1024 / decimal_multiplier)) + ;; + *) + echo "Error: Unsupported suffix: $suffix" >&2 + echo "0" + return + ;; + esac + + # Convert bytes per second to KB/s (1 KB = 1000 bytes for trickle) + local kbps=$((bytes_per_second / 1000)) + + # Minimum 1 KB/s if > 0 + if [[ $bytes_per_second -gt 0 && $kbps -eq 0 ]]; then + kbps=1 + fi + + echo "$kbps" + else + echo "Error: Invalid bandwidth limit format: $limit" >&2 + echo "0" + fi +} + +# Check if bandwidth limit is set via environment variable +if [[ -n "$BANDWIDTH_LIMIT" ]]; then + echo "Bandwidth limit detected: $BANDWIDTH_LIMIT" + + # Convert bandwidth limit to trickle format + kbps=$(convert_bandwidth_limit "$BANDWIDTH_LIMIT") + + if [[ "$kbps" -gt 0 ]]; then + echo "Applying bandwidth limit: ${kbps} KB/s" + # Use trickle to wrap the data-loader command + exec trickle -d "$kbps" -u "$kbps" /usr/local/bin/data-loader "$@" + else + echo "Invalid or zero bandwidth limit, running without trickle" + exec /usr/local/bin/data-loader "$@" + fi +else + # No bandwidth limit, run data-loader directly + exec /usr/local/bin/data-loader "$@" +fi \ No newline at end of file diff --git a/internal/controller/dataset/dataset_controller.go b/internal/controller/dataset/dataset_controller.go index e1debe0..43a744a 100644 --- a/internal/controller/dataset/dataset_controller.go +++ b/internal/controller/dataset/dataset_controller.go @@ -589,8 +589,13 @@ func (r *DatasetReconciler) reconcileJob(ctx context.Context, ds *datasetv1alpha } options := make(map[string]string) + var bandwidthLimit string for k, v := range ds.Spec.Source.Options { - options[k] = v + if k == "bandwidthLimit" { + bandwidthLimit = v + } else { + options[k] = v + } } podSpec := &jobSpec.Template.Spec @@ -698,6 +703,14 @@ func (r *DatasetReconciler) reconcileJob(ctx context.Context, ds *datasetv1alpha container.Args = args + // Set bandwidth limit as environment variable if specified + if bandwidthLimit != "" { + container.Env = append(container.Env, corev1.EnvVar{ + Name: "BANDWIDTH_LIMIT", + Value: bandwidthLimit, + }) + } + // 最终创建 Job job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ diff --git a/internal/pkg/datasources/datasource_http.go b/internal/pkg/datasources/datasource_http.go index b401033..a8639a2 100644 --- a/internal/pkg/datasources/datasource_http.go +++ b/internal/pkg/datasources/datasource_http.go @@ -125,6 +125,7 @@ func (d *HTTPLoader) Sync(fromURI string, toPath string) error { } args = append(args, "-vvv") + cmd := exec.Command("rclone", args...) cmd.Dir = d.Options.Root diff --git a/internal/pkg/datasources/datasource_huggingface_test.go b/internal/pkg/datasources/datasource_huggingface_test.go index 4d57f66..9bba09a 100644 --- a/internal/pkg/datasources/datasource_huggingface_test.go +++ b/internal/pkg/datasources/datasource_huggingface_test.go @@ -70,3 +70,4 @@ func TestHuggingFaceLoader(t *testing.T) { assert.Equal(t, string(bbs[2]), "whoami\n") assert.Equal(t, string(bbs[3]), strings.Join([]string{"download", "ns/model", "--local-dir", huggingFaceDir, "--resume-download"}, " ")+"\n") } + diff --git a/internal/pkg/datasources/datasource_modelscope_test.go b/internal/pkg/datasources/datasource_modelscope_test.go index ee58605..bf0ff0d 100644 --- a/internal/pkg/datasources/datasource_modelscope_test.go +++ b/internal/pkg/datasources/datasource_modelscope_test.go @@ -56,3 +56,4 @@ func TestModelScopeLoader(t *testing.T) { assert.Equal(t, string(bbs[0]), "login --token test-token\n") assert.Equal(t, string(bbs[1]), strings.Join([]string{"download", "ns/model", "--local_dir", modelScopeDir}, " ")+"\n") } + diff --git a/internal/pkg/datasources/datasource_s3.go b/internal/pkg/datasources/datasource_s3.go index 23f1e07..4bbda07 100644 --- a/internal/pkg/datasources/datasource_s3.go +++ b/internal/pkg/datasources/datasource_s3.go @@ -205,6 +205,7 @@ func (d *S3Loader) Sync(fromURI string, toPath string) error { } args = append(args, "-vvv") + cmd := exec.Command("rclone", args...) cmd.Dir = d.Options.Root diff --git a/internal/pkg/datasources/datasource_s3_test.go b/internal/pkg/datasources/datasource_s3_test.go index 159d63c..49a7498 100644 --- a/internal/pkg/datasources/datasource_s3_test.go +++ b/internal/pkg/datasources/datasource_s3_test.go @@ -63,3 +63,4 @@ func TestS3Loader(t *testing.T) { assert.True(t, strings.HasPrefix(string(bbs[1]), "config create")) assert.True(t, strings.HasPrefix(string(bbs[2]), "sync")) } +