From a708f2eb7494d9cd98f604a940b2b0509177393f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 19:53:52 +0000 Subject: [PATCH 1/4] Initial plan From 0b59dad588d787014616f02c3ffd3b170e818b30 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 20:00:42 +0000 Subject: [PATCH 2/4] Add retry logic to cpFromPod and cpToPod functions Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- src/cp.ts | 287 ++++++++++++++++++++++++++++++++++++++++++------- src/cp_test.ts | 38 +++++++ 2 files changed, 287 insertions(+), 38 deletions(-) diff --git a/src/cp.ts b/src/cp.ts index f8920e908d..1c318d38fb 100644 --- a/src/cp.ts +++ b/src/cp.ts @@ -1,9 +1,130 @@ +import { Readable, Transform } from 'stream'; import { WritableStreamBuffer } from 'stream-buffers'; import tar from 'tar-fs'; import { KubeConfig } from './config.js'; import { Exec } from './exec.js'; +/** + * TarPipe wraps the tar stream from a pod with retry capabilities. + * When a read error occurs during transfer, it can resume from the last successful byte position. + */ +class TarPipe extends Readable { + private src: { + namespace: string; + podName: string; + containerName: string; + srcPath: string; + cwd?: string; + }; + private execInstance: Exec; + private maxTries: number; + private bytesRead: number = 0; + private retries: number = 0; + private currentReader: Readable | null = null; + private errStream: WritableStreamBuffer | null = null; + + constructor( + namespace: string, + podName: string, + containerName: string, + srcPath: string, + execInstance: Exec, + maxTries: number, + cwd?: string, + ) { + super(); + this.src = { namespace, podName, containerName, srcPath, cwd }; + this.execInstance = execInstance; + this.maxTries = maxTries; + this.initReadFrom(0); + } + + private initReadFrom(offset: number): void { + let command: string[]; + if (this.maxTries !== 0 && offset > 0) { + // Use shell command with tail to resume from specific byte position + const tarCmd = this.src.cwd + ? `tar cf - -C ${this.src.cwd} ${this.src.srcPath}` + : `tar cf - ${this.src.srcPath}`; + command = ['sh', '-c', `${tarCmd} | tail -c+${offset}`]; + } else { + command = ['tar', 'cf', '-']; + if (this.src.cwd) { + command.push('-C', this.src.cwd); + } + command.push(this.src.srcPath); + } + + const writerStream = new Transform({ + transform: (chunk, encoding, callback) => { + callback(null, chunk); + }, + }); + + this.errStream = new WritableStreamBuffer(); + this.currentReader = writerStream; + + // Set up stream event handlers + writerStream.on('data', (chunk: Buffer) => { + this.bytesRead += chunk.length; + if (!this.push(chunk)) { + writerStream.pause(); + } + }); + + writerStream.on('end', () => { + this.push(null); // Signal end of stream + }); + + writerStream.on('error', (err: Error) => { + this.handleError(err); + }); + + // Start exec + this.execInstance + .exec( + this.src.namespace, + this.src.podName, + this.src.containerName, + command, + writerStream, + this.errStream, + null, + false, + async () => { + if (this.errStream && this.errStream.size()) { + const errMsg = this.errStream.getContentsAsString(); + this.handleError(new Error(`Error from pod - details: \n ${errMsg}`)); + } + }, + ) + .catch((err: Error) => { + this.handleError(err); + }); + } + + private handleError(err: Error): void { + if (this.maxTries < 0 || this.retries < this.maxTries) { + this.retries++; + console.error( + `Resuming copy at ${this.bytesRead} bytes, retry ${this.retries}/${this.maxTries < 0 ? '∞' : this.maxTries}`, + ); + // Resume from the next byte position + this.initReadFrom(this.bytesRead + 1); + } else { + console.error(`Dropping out copy after ${this.retries} retries`); + this.destroy(err); + } + } + + _read(): void { + if (this.currentReader) { + this.currentReader.resume(); + } + } +} + export class Cp { public execInstance: Exec; public constructor(config: KubeConfig, execInstance?: Exec) { @@ -17,6 +138,9 @@ export class Cp { * @param {string} srcPath - The source path in the pod * @param {string} tgtPath - The target path in local * @param {string} [cwd] - The directory that is used as the parent in the pod when downloading + * @param {number} [maxTries=0] - Set number of retries to complete a copy operation from a container. + * Specify 0 to disable or any negative value for infinite retrying. + * The default is 0 (no retry). */ public async cpFromPod( namespace: string, @@ -25,29 +149,63 @@ export class Cp { srcPath: string, tgtPath: string, cwd?: string, + maxTries: number = 0, ): Promise { - const command = ['tar', 'cf', '-']; - if (cwd) { - command.push('-C', cwd); + if (maxTries === 0) { + // Original implementation without retry + const command = ['tar', 'cf', '-']; + if (cwd) { + command.push('-C', cwd); + } + command.push(srcPath); + const writerStream = tar.extract(tgtPath); + const errStream = new WritableStreamBuffer(); + await this.execInstance.exec( + namespace, + podName, + containerName, + command, + writerStream, + errStream, + null, + false, + async () => { + if (errStream.size()) { + throw new Error( + `Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`, + ); + } + }, + ); + } else { + // Implementation with retry using TarPipe + const tarPipe = new TarPipe( + namespace, + podName, + containerName, + srcPath, + this.execInstance, + maxTries, + cwd, + ); + const writerStream = tar.extract(tgtPath); + + return new Promise((resolve, reject) => { + tarPipe.on('error', (err: Error) => { + reject(err); + }); + + writerStream.on('error', (err: Error) => { + reject(err); + }); + + writerStream.on('finish', () => { + resolve(); + }); + + tarPipe.pipe(writerStream); + }); } - command.push(srcPath); - const writerStream = tar.extract(tgtPath); - const errStream = new WritableStreamBuffer(); - this.execInstance.exec( - namespace, - podName, - containerName, - command, - writerStream, - errStream, - null, - false, - async () => { - if (errStream.size()) { - throw new Error(`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`); - } - }, - ); } /** @@ -56,6 +214,9 @@ export class Cp { * @param {string} containerName - The name of the container in the pod to exec the command inside. * @param {string} srcPath - The source path in local * @param {string} tgtPath - The target path in the pod + * @param {number} [maxTries=0] - Set number of retries to complete a copy operation to a container. + * Specify 0 to disable or any negative value for infinite retrying. + * The default is 0 (no retry). */ public async cpToPod( namespace: string, @@ -63,24 +224,74 @@ export class Cp { containerName: string, srcPath: string, tgtPath: string, + maxTries: number = 0, ): Promise { - const command = ['tar', 'xf', '-', '-C', tgtPath]; - const readStream = tar.pack(srcPath); - const errStream = new WritableStreamBuffer(); - this.execInstance.exec( - namespace, - podName, - containerName, - command, - null, - errStream, - readStream, - false, - async () => { - if (errStream.size()) { - throw new Error(`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`); + if (maxTries === 0) { + // Original implementation without retry + const command = ['tar', 'xf', '-', '-C', tgtPath]; + const readStream = tar.pack(srcPath); + const errStream = new WritableStreamBuffer(); + await this.execInstance.exec( + namespace, + podName, + containerName, + command, + null, + errStream, + readStream, + false, + async () => { + if (errStream.size()) { + throw new Error( + `Error from cpToPod - details: \n ${errStream.getContentsAsString()}`, + ); + } + }, + ); + } else { + // Implementation with retry + let retries = 0; + let lastError: Error | null = null; + + while (maxTries < 0 || retries <= maxTries) { + try { + const command = ['tar', 'xf', '-', '-C', tgtPath]; + const readStream = tar.pack(srcPath); + const errStream = new WritableStreamBuffer(); + + await this.execInstance.exec( + namespace, + podName, + containerName, + command, + null, + errStream, + readStream, + false, + async () => { + if (errStream.size()) { + throw new Error( + `Error from cpToPod - details: \n ${errStream.getContentsAsString()}`, + ); + } + }, + ); + // Success - exit the retry loop + return; + } catch (err) { + lastError = err as Error; + if (maxTries < 0 || retries < maxTries) { + retries++; + console.error( + `Retrying cpToPod, attempt ${retries}/${maxTries < 0 ? '∞' : maxTries}: ${lastError.message}`, + ); + } else { + break; + } } - }, - ); + } + + throw new Error(`cpToPod failed after ${retries} retries: ${lastError?.message}`); + } } } diff --git a/src/cp_test.ts b/src/cp_test.ts index fb38247df3..cfa32b0583 100644 --- a/src/cp_test.ts +++ b/src/cp_test.ts @@ -106,5 +106,43 @@ describe('Cp', () => { await cp.cpToPod(namespace, pod, container, srcPath, tgtPath); verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called(); }); + + it('should run extract tar command with retries parameter', async () => { + const kc = new KubeConfig(); + const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler); + const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket) as WebSocket.WebSocket; + const callAwaiter: CallAwaiter = new CallAwaiter(); + const exec = new Exec(kc, instance(fakeWebSocketInterface)); + const cp = new Cp(kc, exec); + + const namespace = 'somenamespace'; + const pod = 'somepod'; + const container = 'container'; + const srcPath = 'testdata/archive.txt'; + const tgtPath = '/'; + const maxTries = 3; + const cmdArray = ['tar', 'xf', '-', '-C', tgtPath]; + const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`; + + const query = { + stdout: false, + stderr: true, + stdin: true, + tty: false, + command: cmdArray, + container, + }; + const queryStr = querystring.stringify(query); + + const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket); + when(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).thenResolve( + fakeConn, + ); + when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send')); + when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close')); + + await cp.cpToPod(namespace, pod, container, srcPath, tgtPath, maxTries); + verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called(); + }); }); }); From 31d29d2d9955c2758a8e38ac0480088225eba74d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 20:03:36 +0000 Subject: [PATCH 3/4] Fix byte offset issues in retry logic and use PassThrough stream Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- src/cp.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/cp.ts b/src/cp.ts index 1c318d38fb..6e7495da63 100644 --- a/src/cp.ts +++ b/src/cp.ts @@ -1,4 +1,4 @@ -import { Readable, Transform } from 'stream'; +import { PassThrough, Readable } from 'stream'; import { WritableStreamBuffer } from 'stream-buffers'; import tar from 'tar-fs'; @@ -44,10 +44,11 @@ class TarPipe extends Readable { let command: string[]; if (this.maxTries !== 0 && offset > 0) { // Use shell command with tail to resume from specific byte position + // tail -c+N is 1-indexed, so we add 1 to the 0-indexed offset const tarCmd = this.src.cwd ? `tar cf - -C ${this.src.cwd} ${this.src.srcPath}` : `tar cf - ${this.src.srcPath}`; - command = ['sh', '-c', `${tarCmd} | tail -c+${offset}`]; + command = ['sh', '-c', `${tarCmd} | tail -c+${offset + 1}`]; } else { command = ['tar', 'cf', '-']; if (this.src.cwd) { @@ -56,11 +57,7 @@ class TarPipe extends Readable { command.push(this.src.srcPath); } - const writerStream = new Transform({ - transform: (chunk, encoding, callback) => { - callback(null, chunk); - }, - }); + const writerStream = new PassThrough(); this.errStream = new WritableStreamBuffer(); this.currentReader = writerStream; @@ -110,8 +107,8 @@ class TarPipe extends Readable { console.error( `Resuming copy at ${this.bytesRead} bytes, retry ${this.retries}/${this.maxTries < 0 ? '∞' : this.maxTries}`, ); - // Resume from the next byte position - this.initReadFrom(this.bytesRead + 1); + // Resume from the current byte position (bytesRead is the next byte to read) + this.initReadFrom(this.bytesRead); } else { console.error(`Dropping out copy after ${this.retries} retries`); this.destroy(err); From 13cc6cbfc0cde23211764a65b784a11e5c6f5e7e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 4 Feb 2026 20:05:18 +0000 Subject: [PATCH 4/4] Update cp example with retry usage documentation Co-authored-by: brendandburns <5751682+brendandburns@users.noreply.github.com> --- examples/typescript/cp/cp-example.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/examples/typescript/cp/cp-example.ts b/examples/typescript/cp/cp-example.ts index 92b81711c1..dada9b737d 100644 --- a/examples/typescript/cp/cp-example.ts +++ b/examples/typescript/cp/cp-example.ts @@ -11,4 +11,11 @@ const container = 'nginx'; const srcPath = './test.txt'; const targetPath = '/tmp'; +// Simple copy without retries (default behavior) await cp.cpFromPod(namespace, pod, container, srcPath, targetPath); + +// For large files or unreliable connections, use retries: +// - maxTries > 0: Retry up to N times +// - maxTries < 0: Retry indefinitely +// await cp.cpFromPod(namespace, pod, container, srcPath, targetPath, undefined, 10); +// await cp.cpToPod(namespace, pod, container, srcPath, targetPath, 10);