Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ mine_test.log
test-easy-output/
.cache/
.cargo/
swe-forge.log
auto_publish.log
4 changes: 2 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ src/
```
GH Archive (hourly dumps, 8x concurrent)
→ Pre-filter (merged PRs, no bots, org repos)
→ GitHub API enrichment (10x concurrent, rate-limited 5000/h)
→ GitHub API enrichment (20x concurrent, rate-limited 5000/h)
→ Local filter (language, stars, files changed)
→ LLM pre-classification (25x concurrent, title+body only)
→ Patch extraction + agentic test generation (8x concurrent)
Expand Down Expand Up @@ -138,7 +138,7 @@ Git hooks are in `.githooks/` and activated via `git config core.hooksPath .gith

4. **Docker containers must have resource limits** — All container creation must use `apply_resource_limits()` from `src/docker/resources.rs`. Difficulty-based limits are enforced: PIDs (100–500), storage (1–5 GB), network mode (none/internal). Never create containers without limits.

5. **Respect GitHub API rate limits (5000 req/h)** — The pipeline uses semaphore-based concurrency (no chunk barriers). Each candidate needs ~2 API calls for enrichment. Never add unbounded concurrent GitHub API calls. Use the existing concurrency limits (enrichment: 10x, pre-classification: 25x, deep processing: 8x).
5. **Respect GitHub API rate limits (5000 req/h)** — The pipeline uses semaphore-based concurrency (no chunk barriers). Each candidate needs ~2 API calls for enrichment. Never add unbounded concurrent GitHub API calls. Use the existing concurrency limits (enrichment: 20x, pre-classification: 25x, deep processing: 8x).

6. **All async code must be `Send + Sync` compatible** — The codebase uses `Arc<dyn LlmProvider>` extensively. Trait objects must be `Send + Sync`. Never introduce `Rc`, `RefCell`, or non-Send types in async contexts.

Expand Down
58 changes: 58 additions & 0 deletions auto_publish.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/bin/bash
# Auto-publish script: uploads task directories to HuggingFace every 30 minutes
# The pipeline already uploads parquet shards in real-time.
# This script handles uploading the task workspace directories (prompt.md, workspace.yaml, tests/)
# that accumulate in generated-swe/ as tasks complete.

HF_TOKEN="${HF_TOKEN:?Set HF_TOKEN environment variable}"
HF_REPO="CortexLM/swe-forge"
OUTPUT_DIR="generated-swe"
UPLOADED_MARKER=".hf_uploaded"
INTERVAL=1800 # 30 minutes

upload_task_dir() {
local task_dir="$1"
local task_rel="$2" # relative path from OUTPUT_DIR (e.g. osism/container-image-inventory-reconciler-489)
local task_id=$(echo "$task_rel" | tr '/' '-')

echo "[$(date)] Uploading task: $task_rel"

find "$task_dir" -type f ! -name "$UPLOADED_MARKER" | while read -r filepath; do
local rel_path="${filepath#$task_dir/}"
local repo_path="tasks/${task_rel}/${rel_path}"
local content_b64=$(base64 -w0 "$filepath")

curl -s -X POST "https://huggingface.co/api/datasets/${HF_REPO}/commit/main" \
-H "Authorization: Bearer ${HF_TOKEN}" \
-H "Content-Type: application/json" \
-d "{\"summary\":\"Add task ${task_id}\",\"actions\":[{\"action\":\"file\",\"path\":\"${repo_path}\",\"content\":\"${content_b64}\",\"encoding\":\"base64\"}]}" \
> /dev/null 2>&1
done

touch "${task_dir}/${UPLOADED_MARKER}"
echo "[$(date)] Uploaded task: $task_rel"
}

echo "[$(date)] Auto-publish started (interval: ${INTERVAL}s)"

while true; do
if [ -d "$OUTPUT_DIR" ]; then
# Find task directories recursively (they contain workspace.yaml)
find "$OUTPUT_DIR" -name "workspace.yaml" -type f 2>/dev/null | while read -r ws_file; do
task_dir=$(dirname "$ws_file")
[ -f "${task_dir}/${UPLOADED_MARKER}" ] && continue
# Compute relative path from OUTPUT_DIR
task_rel="${task_dir#$OUTPUT_DIR/}"
upload_task_dir "$task_dir" "$task_rel"
done

task_count=$(find "$OUTPUT_DIR" -name "workspace.yaml" -type f 2>/dev/null | wc -l)
uploaded_count=$(find "$OUTPUT_DIR" -name "$UPLOADED_MARKER" -type f 2>/dev/null | wc -l)
echo "[$(date)] Status: ${uploaded_count}/${task_count} tasks uploaded to HF"
else
echo "[$(date)] Output directory not found yet: $OUTPUT_DIR"
fi

echo "[$(date)] Sleeping ${INTERVAL}s until next publish cycle..."
sleep $INTERVAL
done
4 changes: 2 additions & 2 deletions src/cli/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub struct SweMineArgs {
#[arg(long, default_value = "true")]
pub validate_workspace: bool,

/// Override enrichment concurrency (default: 10).
/// Override enrichment concurrency (default: 20).
#[arg(long)]
pub concurrency_enrich: Option<usize>,

Expand Down Expand Up @@ -231,7 +231,7 @@ pub struct SweBenchmarkArgs {
#[arg(long, default_value = "gharchive")]
pub source: String,

/// Override enrichment concurrency (default: 10).
/// Override enrichment concurrency (default: 20).
#[arg(long)]
pub concurrency_enrich: Option<usize>,

Expand Down
49 changes: 49 additions & 0 deletions src/export/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,40 @@ impl DatasetManager {
}
}

// Upload task directories (workspace.yaml, prompt.md, tests/) to HF under tasks/
if let Some(ref uploader) = self.uploader {
let mut task_dirs = Vec::new();
Self::find_task_dirs(&self.config.output_dir, &mut task_dirs);
for task_dir in &task_dirs {
let rel = task_dir
.strip_prefix(&self.config.output_dir)
.unwrap_or(task_dir);
let task_id = rel
.to_string_lossy()
.replace(std::path::MAIN_SEPARATOR, "/");
let repo_prefix = format!("tasks/{}", task_id);
match uploader
.upload_directory(task_dir, &repo_prefix, &format!("Add task {}", task_id))
.await
{
Ok(count) => {
tracing::info!(
task_id = %task_id,
files = count,
"Uploaded task directory to HF"
);
}
Err(e) => {
tracing::warn!(
task_id = %task_id,
error = %e,
"Failed to upload task directory to HF"
);
}
}
}
}

let summary = DatasetSummary {
total_tasks: total,
shard_count,
Expand All @@ -239,6 +273,21 @@ impl DatasetManager {
Ok(summary)
}

fn find_task_dirs(dir: &Path, out: &mut Vec<std::path::PathBuf>) {
if let Ok(entries) = std::fs::read_dir(dir) {
for entry in entries.filter_map(|e| e.ok()) {
let path = entry.path();
if path.is_dir() {
if path.join("workspace.yaml").exists() {
out.push(path);
} else {
Self::find_task_dirs(&path, out);
}
}
}
}
}

fn generate_dataset_card(name: &str, hf_cfg: &HfUploadConfig) -> String {
format!(
r#"---
Expand Down
95 changes: 71 additions & 24 deletions src/export/hf_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! (including parquet) to a HuggingFace dataset repository.

use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde::Serialize;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;
Expand All @@ -18,20 +18,6 @@ pub struct HfUploadConfig {
pub private: bool,
}

#[derive(Debug, Serialize)]
struct CreateRepoRequest {
#[serde(rename = "type")]
repo_type: String,
name: String,
private: bool,
}

#[derive(Debug, Deserialize)]
#[allow(dead_code)]
struct CreateRepoResponse {
url: Option<String>,
}

#[derive(Debug, Serialize)]
struct CommitAction {
action: String,
Expand Down Expand Up @@ -68,15 +54,21 @@ impl HfUploader {
pub async fn ensure_repo_exists(&self) -> anyhow::Result<()> {
let url = format!("{}/repos/create", HF_API_BASE);

// Extract org/name for the API
let name = self.config.repo_id.clone();

let body = CreateRepoRequest {
repo_type: "dataset".to_string(),
name,
private: self.config.private,
let (organization, name) = if let Some((org, n)) = self.config.repo_id.split_once('/') {
(Some(org.to_string()), n.to_string())
} else {
(None, self.config.repo_id.clone())
};

let mut body = serde_json::json!({
"type": "dataset",
"name": name,
"private": self.config.private,
});
if let Some(org) = organization {
body["organization"] = serde_json::Value::String(org);
}

let resp = self
.client
.post(&url)
Expand All @@ -87,12 +79,16 @@ impl HfUploader {

let status = resp.status();
if status.is_success() || status.as_u16() == 409 {
// 409 = already exists, that's fine
tracing::info!(repo = %self.config.repo_id, "HF dataset repo ready");
Ok(())
} else {
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("Failed to create HF repo ({}): {}", status, text);
if text.contains("already created") || text.contains("already exist") {
tracing::info!(repo = %self.config.repo_id, "HF dataset repo already exists");
Ok(())
} else {
anyhow::bail!("Failed to create HF repo ({}): {}", status, text);
}
}
}

Expand Down Expand Up @@ -228,6 +224,57 @@ impl HfUploader {
.await
}

/// Upload an entire directory tree to the HF repo.
/// `local_dir` is the directory on disk.
/// `repo_prefix` is the prefix path inside the repo (e.g. "tasks/my-task-id").
pub async fn upload_directory(
&self,
local_dir: &Path,
repo_prefix: &str,
commit_message: &str,
) -> anyhow::Result<usize> {
let mut file_pairs: Vec<(String, Vec<u8>)> = Vec::new();

fn walk(dir: &Path, prefix: &str, out: &mut Vec<(String, Vec<u8>)>) -> anyhow::Result<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let name = entry.file_name().to_string_lossy().to_string();
let repo_path = if prefix.is_empty() {
name.clone()
} else {
format!("{}/{}", prefix, name)
};
if path.is_dir() {
walk(&path, &repo_path, out)?;
} else if path.is_file() {
if let Ok(bytes) = std::fs::read(&path) {
out.push((repo_path, bytes));
}
}
}
Ok(())
}

walk(local_dir, repo_prefix, &mut file_pairs)?;

if file_pairs.is_empty() {
return Ok(0);
}

let total = file_pairs.len();

for chunk in file_pairs.chunks(20) {
let refs: Vec<(&str, &[u8])> = chunk
.iter()
.map(|(p, b)| (p.as_str(), b.as_slice()))
.collect();
self.upload_files(&refs, commit_message).await?;
}

Ok(total)
}

pub fn repo_url(&self) -> String {
format!("https://huggingface.co/datasets/{}", self.config.repo_id)
}
Expand Down
2 changes: 1 addition & 1 deletion src/swe/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl SwePipeline {
// Each event flows independently through: enrich -> filter -> pre-classify -> deep process.
// Semaphores control concurrency at each stage. No chunk barriers.
let deep_concurrency = config.concurrency_deep.unwrap_or(8);
let enrich_sem = Arc::new(Semaphore::new(config.concurrency_enrich.unwrap_or(10)));
let enrich_sem = Arc::new(Semaphore::new(config.concurrency_enrich.unwrap_or(20)));
let preclassify_sem =
Arc::new(Semaphore::new(config.concurrency_preclassify.unwrap_or(25)));
let deep_sem = Arc::new(Semaphore::new(deep_concurrency));
Expand Down
Loading