Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 0 additions & 59 deletions src/worker/cleanup.ts

This file was deleted.

19 changes: 18 additions & 1 deletion src/worker/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export async function registerRun(
.bind(existing.id)
.run();

// Best-effort terminate the old workflow instance
try {
const instance = await env.SCREENSHOT_WORKFLOW.get(existing.id);
await instance.terminate();
} catch {
// Workflow instance may already be gone
}

// Best-effort destroy the old sandbox
try {
const sandbox = getSandbox(env.Sandbox, existing.id);
Expand Down Expand Up @@ -105,7 +113,8 @@ export async function updateRunStatus(
}

/**
* Force-kill a run: mark it failed in D1 and destroy its sandbox DO.
* Force-kill a run: mark it failed in D1, terminate its workflow, and
* destroy its sandbox DO.
* Returns true if a run was found and updated.
*/
export async function killRun(env: Env, runId: string): Promise<boolean> {
Expand All @@ -122,6 +131,14 @@ export async function killRun(env: Env, runId: string): Promise<boolean> {
.bind(runId)
.run();

// Best-effort terminate the workflow instance
try {
const instance = await env.SCREENSHOT_WORKFLOW.get(runId);
await instance.terminate();
} catch {
// Workflow instance may already be gone
}

// Best-effort destroy the sandbox DO
try {
const sandbox = getSandbox(env.Sandbox, runId);
Expand Down
3 changes: 1 addition & 2 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Hono } from "hono";
import { proxyToSandbox } from "@cloudflare/sandbox";
export { Sandbox } from "@cloudflare/sandbox";
export { ScreenshotWorkflow } from "./workflow";

import { handleCDP } from "./cdp";
import { handleQueue } from "./queue";
import { webhook } from "./routes/webhook";
import { start } from "./routes/start";
import { logs } from "./routes/logs";
Expand Down Expand Up @@ -56,5 +56,4 @@ app.get("/health", (c) => c.json({ status: "ok", service: "visual-diff" }));

export default {
fetch: app.fetch,
queue: handleQueue,
};
104 changes: 0 additions & 104 deletions src/worker/queue.ts

This file was deleted.

2 changes: 1 addition & 1 deletion src/worker/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ messages.get("/", async (c) => {

// Try the live sandbox first
try {
// Read session metadata written by startScreenshotJob
// Read session metadata written by the workflow's start-agent step
const file = await Promise.race([
sandbox.readFile("/workspace/opencode-session.json"),
new Promise<never>((_, reject) =>
Expand Down
20 changes: 10 additions & 10 deletions src/worker/routes/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import {
parsePRUrl,
lookupInstallationId,
} from "../github";
import { buildQueueMessage } from "../queue";
import { registerRun } from "../db";
import type { Env } from "../types";

Expand Down Expand Up @@ -62,16 +61,17 @@ start.post("/", async (c) => {
commitSha: prDetails.headSha,
});

const message = await buildQueueMessage(c.env, {
owner,
repo,
prNumber,
commitSha: prDetails.headSha,
installationId,
prTitle: prDetails.title,
prDescription: prDetails.body,
await c.env.SCREENSHOT_WORKFLOW.create({
id: sid,
params: {
sandboxId: sid,
owner,
repo,
prNumber,
commitSha: prDetails.headSha,
installationId,
},
});
await c.env.SCREENSHOT_QUEUE.send({ ...message, sandboxId: sid });

return c.json(
{
Expand Down
44 changes: 24 additions & 20 deletions src/worker/routes/webhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
createOctokit,
reactToComment,
} from "../github";
import { buildQueueMessage, buildQueueMessageFromPR } from "../queue";
import { registerRun } from "../db";
import type { Env } from "../types";

Expand Down Expand Up @@ -37,6 +36,7 @@ webhook.post("/", async (c) => {
const owner = payload.repository.owner.login;
const repo = payload.repository.name;
const commitSha = pr.merge_commit_sha ?? pr.head.sha;
const installationId = payload.installation.id;

await registerRun(c.env, {
id: sid,
Expand All @@ -46,17 +46,18 @@ webhook.post("/", async (c) => {
commitSha,
});

const installationId = payload.installation.id;
const message = await buildQueueMessage(c.env, {
owner,
repo,
prNumber: pr.number,
commitSha,
installationId,
prTitle: pr.title,
prDescription: pr.body ?? "",
await c.env.SCREENSHOT_WORKFLOW.create({
id: sid,
params: {
sandboxId: sid,
owner,
repo,
prNumber: pr.number,
commitSha,
installationId,
},
});
await c.env.SCREENSHOT_QUEUE.send({ ...message, sandboxId: sid });

return c.text("Accepted", 202);
}

Expand Down Expand Up @@ -93,15 +94,18 @@ webhook.post("/", async (c) => {
commitSha: pr.data.head.sha,
});

const message = await buildQueueMessageFromPR(
c.env,
owner,
repo,
prNumber,
pr.data.head.sha,
installationId,
);
await c.env.SCREENSHOT_QUEUE.send({ ...message, sandboxId: sid });
await c.env.SCREENSHOT_WORKFLOW.create({
id: sid,
params: {
sandboxId: sid,
owner,
repo,
prNumber,
commitSha: pr.data.head.sha,
installationId,
},
});

return c.text("Accepted", 202);
}

Expand Down
Loading