diff --git a/.gitignore b/.gitignore index 7328909..ad68edb 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,5 @@ mine_test.log test-easy-output/ .cache/ .cargo/ +swe-forge.log +auto_publish.log diff --git a/AGENTS.md b/AGENTS.md index 66b8063..660946d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -52,7 +52,7 @@ src/ ``` GH Archive (hourly dumps, 8x concurrent) → Pre-filter (merged PRs, no bots, org repos) - → GitHub API enrichment (10x concurrent, rate-limited 5000/h) + → GitHub API enrichment (20x concurrent, rate-limited 5000/h) → Local filter (language, stars, files changed) → LLM pre-classification (25x concurrent, title+body only) → Patch extraction + agentic test generation (8x concurrent) @@ -138,7 +138,7 @@ Git hooks are in `.githooks/` and activated via `git config core.hooksPath .gith 4. **Docker containers must have resource limits** — All container creation must use `apply_resource_limits()` from `src/docker/resources.rs`. Difficulty-based limits are enforced: PIDs (100–500), storage (1–5 GB), network mode (none/internal). Never create containers without limits. -5. **Respect GitHub API rate limits (5000 req/h)** — The pipeline uses semaphore-based concurrency (no chunk barriers). Each candidate needs ~2 API calls for enrichment. Never add unbounded concurrent GitHub API calls. Use the existing concurrency limits (enrichment: 10x, pre-classification: 25x, deep processing: 8x). +5. **Respect GitHub API rate limits (5000 req/h)** — The pipeline uses semaphore-based concurrency (no chunk barriers). Each candidate needs ~2 API calls for enrichment. Never add unbounded concurrent GitHub API calls. Use the existing concurrency limits (enrichment: 20x, pre-classification: 25x, deep processing: 8x). 6. **All async code must be `Send + Sync` compatible** — The codebase uses `Arc` extensively. Trait objects must be `Send + Sync`. Never introduce `Rc`, `RefCell`, or non-Send types in async contexts. diff --git a/auto_publish.sh b/auto_publish.sh new file mode 100755 index 0000000..812f5fa --- /dev/null +++ b/auto_publish.sh @@ -0,0 +1,58 @@ +#!/bin/bash +# Auto-publish script: uploads task directories to HuggingFace every 30 minutes +# The pipeline already uploads parquet shards in real-time. +# This script handles uploading the task workspace directories (prompt.md, workspace.yaml, tests/) +# that accumulate in generated-swe/ as tasks complete. + +HF_TOKEN="${HF_TOKEN:?Set HF_TOKEN environment variable}" +HF_REPO="CortexLM/swe-forge" +OUTPUT_DIR="generated-swe" +UPLOADED_MARKER=".hf_uploaded" +INTERVAL=1800 # 30 minutes + +upload_task_dir() { + local task_dir="$1" + local task_rel="$2" # relative path from OUTPUT_DIR (e.g. osism/container-image-inventory-reconciler-489) + local task_id=$(echo "$task_rel" | tr '/' '-') + + echo "[$(date)] Uploading task: $task_rel" + + find "$task_dir" -type f ! -name "$UPLOADED_MARKER" | while read -r filepath; do + local rel_path="${filepath#$task_dir/}" + local repo_path="tasks/${task_rel}/${rel_path}" + local content_b64=$(base64 -w0 "$filepath") + + curl -s -X POST "https://huggingface.co/api/datasets/${HF_REPO}/commit/main" \ + -H "Authorization: Bearer ${HF_TOKEN}" \ + -H "Content-Type: application/json" \ + -d "{\"summary\":\"Add task ${task_id}\",\"actions\":[{\"action\":\"file\",\"path\":\"${repo_path}\",\"content\":\"${content_b64}\",\"encoding\":\"base64\"}]}" \ + > /dev/null 2>&1 + done + + touch "${task_dir}/${UPLOADED_MARKER}" + echo "[$(date)] Uploaded task: $task_rel" +} + +echo "[$(date)] Auto-publish started (interval: ${INTERVAL}s)" + +while true; do + if [ -d "$OUTPUT_DIR" ]; then + # Find task directories recursively (they contain workspace.yaml) + find "$OUTPUT_DIR" -name "workspace.yaml" -type f 2>/dev/null | while read -r ws_file; do + task_dir=$(dirname "$ws_file") + [ -f "${task_dir}/${UPLOADED_MARKER}" ] && continue + # Compute relative path from OUTPUT_DIR + task_rel="${task_dir#$OUTPUT_DIR/}" + upload_task_dir "$task_dir" "$task_rel" + done + + task_count=$(find "$OUTPUT_DIR" -name "workspace.yaml" -type f 2>/dev/null | wc -l) + uploaded_count=$(find "$OUTPUT_DIR" -name "$UPLOADED_MARKER" -type f 2>/dev/null | wc -l) + echo "[$(date)] Status: ${uploaded_count}/${task_count} tasks uploaded to HF" + else + echo "[$(date)] Output directory not found yet: $OUTPUT_DIR" + fi + + echo "[$(date)] Sleeping ${INTERVAL}s until next publish cycle..." + sleep $INTERVAL +done diff --git a/src/cli/commands.rs b/src/cli/commands.rs index a77eb88..6c4f048 100644 --- a/src/cli/commands.rs +++ b/src/cli/commands.rs @@ -175,7 +175,7 @@ pub struct SweMineArgs { #[arg(long, default_value = "true")] pub validate_workspace: bool, - /// Override enrichment concurrency (default: 10). + /// Override enrichment concurrency (default: 20). #[arg(long)] pub concurrency_enrich: Option, @@ -231,7 +231,7 @@ pub struct SweBenchmarkArgs { #[arg(long, default_value = "gharchive")] pub source: String, - /// Override enrichment concurrency (default: 10). + /// Override enrichment concurrency (default: 20). #[arg(long)] pub concurrency_enrich: Option, diff --git a/src/export/dataset.rs b/src/export/dataset.rs index e341ac9..5291ce8 100644 --- a/src/export/dataset.rs +++ b/src/export/dataset.rs @@ -221,6 +221,40 @@ impl DatasetManager { } } + // Upload task directories (workspace.yaml, prompt.md, tests/) to HF under tasks/ + if let Some(ref uploader) = self.uploader { + let mut task_dirs = Vec::new(); + Self::find_task_dirs(&self.config.output_dir, &mut task_dirs); + for task_dir in &task_dirs { + let rel = task_dir + .strip_prefix(&self.config.output_dir) + .unwrap_or(task_dir); + let task_id = rel + .to_string_lossy() + .replace(std::path::MAIN_SEPARATOR, "/"); + let repo_prefix = format!("tasks/{}", task_id); + match uploader + .upload_directory(task_dir, &repo_prefix, &format!("Add task {}", task_id)) + .await + { + Ok(count) => { + tracing::info!( + task_id = %task_id, + files = count, + "Uploaded task directory to HF" + ); + } + Err(e) => { + tracing::warn!( + task_id = %task_id, + error = %e, + "Failed to upload task directory to HF" + ); + } + } + } + } + let summary = DatasetSummary { total_tasks: total, shard_count, @@ -239,6 +273,21 @@ impl DatasetManager { Ok(summary) } + fn find_task_dirs(dir: &Path, out: &mut Vec) { + if let Ok(entries) = std::fs::read_dir(dir) { + for entry in entries.filter_map(|e| e.ok()) { + let path = entry.path(); + if path.is_dir() { + if path.join("workspace.yaml").exists() { + out.push(path); + } else { + Self::find_task_dirs(&path, out); + } + } + } + } + } + fn generate_dataset_card(name: &str, hf_cfg: &HfUploadConfig) -> String { format!( r#"--- diff --git a/src/export/hf_uploader.rs b/src/export/hf_uploader.rs index 7770329..b1975f1 100644 --- a/src/export/hf_uploader.rs +++ b/src/export/hf_uploader.rs @@ -4,7 +4,7 @@ //! (including parquet) to a HuggingFace dataset repository. use reqwest::Client; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use std::path::Path; use std::sync::Arc; use tokio::sync::Mutex; @@ -18,20 +18,6 @@ pub struct HfUploadConfig { pub private: bool, } -#[derive(Debug, Serialize)] -struct CreateRepoRequest { - #[serde(rename = "type")] - repo_type: String, - name: String, - private: bool, -} - -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -struct CreateRepoResponse { - url: Option, -} - #[derive(Debug, Serialize)] struct CommitAction { action: String, @@ -68,15 +54,21 @@ impl HfUploader { pub async fn ensure_repo_exists(&self) -> anyhow::Result<()> { let url = format!("{}/repos/create", HF_API_BASE); - // Extract org/name for the API - let name = self.config.repo_id.clone(); - - let body = CreateRepoRequest { - repo_type: "dataset".to_string(), - name, - private: self.config.private, + let (organization, name) = if let Some((org, n)) = self.config.repo_id.split_once('/') { + (Some(org.to_string()), n.to_string()) + } else { + (None, self.config.repo_id.clone()) }; + let mut body = serde_json::json!({ + "type": "dataset", + "name": name, + "private": self.config.private, + }); + if let Some(org) = organization { + body["organization"] = serde_json::Value::String(org); + } + let resp = self .client .post(&url) @@ -87,12 +79,16 @@ impl HfUploader { let status = resp.status(); if status.is_success() || status.as_u16() == 409 { - // 409 = already exists, that's fine tracing::info!(repo = %self.config.repo_id, "HF dataset repo ready"); Ok(()) } else { let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("Failed to create HF repo ({}): {}", status, text); + if text.contains("already created") || text.contains("already exist") { + tracing::info!(repo = %self.config.repo_id, "HF dataset repo already exists"); + Ok(()) + } else { + anyhow::bail!("Failed to create HF repo ({}): {}", status, text); + } } } @@ -228,6 +224,57 @@ impl HfUploader { .await } + /// Upload an entire directory tree to the HF repo. + /// `local_dir` is the directory on disk. + /// `repo_prefix` is the prefix path inside the repo (e.g. "tasks/my-task-id"). + pub async fn upload_directory( + &self, + local_dir: &Path, + repo_prefix: &str, + commit_message: &str, + ) -> anyhow::Result { + let mut file_pairs: Vec<(String, Vec)> = Vec::new(); + + fn walk(dir: &Path, prefix: &str, out: &mut Vec<(String, Vec)>) -> anyhow::Result<()> { + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let name = entry.file_name().to_string_lossy().to_string(); + let repo_path = if prefix.is_empty() { + name.clone() + } else { + format!("{}/{}", prefix, name) + }; + if path.is_dir() { + walk(&path, &repo_path, out)?; + } else if path.is_file() { + if let Ok(bytes) = std::fs::read(&path) { + out.push((repo_path, bytes)); + } + } + } + Ok(()) + } + + walk(local_dir, repo_prefix, &mut file_pairs)?; + + if file_pairs.is_empty() { + return Ok(0); + } + + let total = file_pairs.len(); + + for chunk in file_pairs.chunks(20) { + let refs: Vec<(&str, &[u8])> = chunk + .iter() + .map(|(p, b)| (p.as_str(), b.as_slice())) + .collect(); + self.upload_files(&refs, commit_message).await?; + } + + Ok(total) + } + pub fn repo_url(&self) -> String { format!("https://huggingface.co/datasets/{}", self.config.repo_id) } diff --git a/src/swe/pipeline.rs b/src/swe/pipeline.rs index bd39197..d7d23e1 100644 --- a/src/swe/pipeline.rs +++ b/src/swe/pipeline.rs @@ -329,7 +329,7 @@ impl SwePipeline { // Each event flows independently through: enrich -> filter -> pre-classify -> deep process. // Semaphores control concurrency at each stage. No chunk barriers. let deep_concurrency = config.concurrency_deep.unwrap_or(8); - let enrich_sem = Arc::new(Semaphore::new(config.concurrency_enrich.unwrap_or(10))); + let enrich_sem = Arc::new(Semaphore::new(config.concurrency_enrich.unwrap_or(20))); let preclassify_sem = Arc::new(Semaphore::new(config.concurrency_preclassify.unwrap_or(25))); let deep_sem = Arc::new(Semaphore::new(deep_concurrency));