diff --git a/examples/typescript/cp/cp-example.ts b/examples/typescript/cp/cp-example.ts index 92b81711c1..dada9b737d 100644 --- a/examples/typescript/cp/cp-example.ts +++ b/examples/typescript/cp/cp-example.ts @@ -11,4 +11,11 @@ const container = 'nginx'; const srcPath = './test.txt'; const targetPath = '/tmp'; +// Simple copy without retries (default behavior) await cp.cpFromPod(namespace, pod, container, srcPath, targetPath); + +// For large files or unreliable connections, use retries: +// - maxTries > 0: Retry up to N times +// - maxTries < 0: Retry indefinitely +// await cp.cpFromPod(namespace, pod, container, srcPath, targetPath, undefined, 10); +// await cp.cpToPod(namespace, pod, container, srcPath, targetPath, 10); diff --git a/src/cp.ts b/src/cp.ts index f8920e908d..6e7495da63 100644 --- a/src/cp.ts +++ b/src/cp.ts @@ -1,9 +1,127 @@ +import { PassThrough, Readable } from 'stream'; import { WritableStreamBuffer } from 'stream-buffers'; import tar from 'tar-fs'; import { KubeConfig } from './config.js'; import { Exec } from './exec.js'; +/** + * TarPipe wraps the tar stream from a pod with retry capabilities. + * When a read error occurs during transfer, it can resume from the last successful byte position. + */ +class TarPipe extends Readable { + private src: { + namespace: string; + podName: string; + containerName: string; + srcPath: string; + cwd?: string; + }; + private execInstance: Exec; + private maxTries: number; + private bytesRead: number = 0; + private retries: number = 0; + private currentReader: Readable | null = null; + private errStream: WritableStreamBuffer | null = null; + + constructor( + namespace: string, + podName: string, + containerName: string, + srcPath: string, + execInstance: Exec, + maxTries: number, + cwd?: string, + ) { + super(); + this.src = { namespace, podName, containerName, srcPath, cwd }; + this.execInstance = execInstance; + this.maxTries = maxTries; + this.initReadFrom(0); + } + + private initReadFrom(offset: number): void { + let command: string[]; + if (this.maxTries !== 0 && offset > 0) { + // Use shell command with tail to resume from specific byte position + // tail -c+N is 1-indexed, so we add 1 to the 0-indexed offset + const tarCmd = this.src.cwd + ? `tar cf - -C ${this.src.cwd} ${this.src.srcPath}` + : `tar cf - ${this.src.srcPath}`; + command = ['sh', '-c', `${tarCmd} | tail -c+${offset + 1}`]; + } else { + command = ['tar', 'cf', '-']; + if (this.src.cwd) { + command.push('-C', this.src.cwd); + } + command.push(this.src.srcPath); + } + + const writerStream = new PassThrough(); + + this.errStream = new WritableStreamBuffer(); + this.currentReader = writerStream; + + // Set up stream event handlers + writerStream.on('data', (chunk: Buffer) => { + this.bytesRead += chunk.length; + if (!this.push(chunk)) { + writerStream.pause(); + } + }); + + writerStream.on('end', () => { + this.push(null); // Signal end of stream + }); + + writerStream.on('error', (err: Error) => { + this.handleError(err); + }); + + // Start exec + this.execInstance + .exec( + this.src.namespace, + this.src.podName, + this.src.containerName, + command, + writerStream, + this.errStream, + null, + false, + async () => { + if (this.errStream && this.errStream.size()) { + const errMsg = this.errStream.getContentsAsString(); + this.handleError(new Error(`Error from pod - details: \n ${errMsg}`)); + } + }, + ) + .catch((err: Error) => { + this.handleError(err); + }); + } + + private handleError(err: Error): void { + if (this.maxTries < 0 || this.retries < this.maxTries) { + this.retries++; + console.error( + `Resuming copy at ${this.bytesRead} bytes, retry ${this.retries}/${this.maxTries < 0 ? '∞' : this.maxTries}`, + ); + // Resume from the current byte position (bytesRead is the next byte to read) + this.initReadFrom(this.bytesRead); + } else { + console.error(`Dropping out copy after ${this.retries} retries`); + this.destroy(err); + } + } + + _read(): void { + if (this.currentReader) { + this.currentReader.resume(); + } + } +} + export class Cp { public execInstance: Exec; public constructor(config: KubeConfig, execInstance?: Exec) { @@ -17,6 +135,9 @@ export class Cp { * @param {string} srcPath - The source path in the pod * @param {string} tgtPath - The target path in local * @param {string} [cwd] - The directory that is used as the parent in the pod when downloading + * @param {number} [maxTries=0] - Set number of retries to complete a copy operation from a container. + * Specify 0 to disable or any negative value for infinite retrying. + * The default is 0 (no retry). */ public async cpFromPod( namespace: string, @@ -25,29 +146,63 @@ export class Cp { srcPath: string, tgtPath: string, cwd?: string, + maxTries: number = 0, ): Promise { - const command = ['tar', 'cf', '-']; - if (cwd) { - command.push('-C', cwd); + if (maxTries === 0) { + // Original implementation without retry + const command = ['tar', 'cf', '-']; + if (cwd) { + command.push('-C', cwd); + } + command.push(srcPath); + const writerStream = tar.extract(tgtPath); + const errStream = new WritableStreamBuffer(); + await this.execInstance.exec( + namespace, + podName, + containerName, + command, + writerStream, + errStream, + null, + false, + async () => { + if (errStream.size()) { + throw new Error( + `Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`, + ); + } + }, + ); + } else { + // Implementation with retry using TarPipe + const tarPipe = new TarPipe( + namespace, + podName, + containerName, + srcPath, + this.execInstance, + maxTries, + cwd, + ); + const writerStream = tar.extract(tgtPath); + + return new Promise((resolve, reject) => { + tarPipe.on('error', (err: Error) => { + reject(err); + }); + + writerStream.on('error', (err: Error) => { + reject(err); + }); + + writerStream.on('finish', () => { + resolve(); + }); + + tarPipe.pipe(writerStream); + }); } - command.push(srcPath); - const writerStream = tar.extract(tgtPath); - const errStream = new WritableStreamBuffer(); - this.execInstance.exec( - namespace, - podName, - containerName, - command, - writerStream, - errStream, - null, - false, - async () => { - if (errStream.size()) { - throw new Error(`Error from cpFromPod - details: \n ${errStream.getContentsAsString()}`); - } - }, - ); } /** @@ -56,6 +211,9 @@ export class Cp { * @param {string} containerName - The name of the container in the pod to exec the command inside. * @param {string} srcPath - The source path in local * @param {string} tgtPath - The target path in the pod + * @param {number} [maxTries=0] - Set number of retries to complete a copy operation to a container. + * Specify 0 to disable or any negative value for infinite retrying. + * The default is 0 (no retry). */ public async cpToPod( namespace: string, @@ -63,24 +221,74 @@ export class Cp { containerName: string, srcPath: string, tgtPath: string, + maxTries: number = 0, ): Promise { - const command = ['tar', 'xf', '-', '-C', tgtPath]; - const readStream = tar.pack(srcPath); - const errStream = new WritableStreamBuffer(); - this.execInstance.exec( - namespace, - podName, - containerName, - command, - null, - errStream, - readStream, - false, - async () => { - if (errStream.size()) { - throw new Error(`Error from cpToPod - details: \n ${errStream.getContentsAsString()}`); + if (maxTries === 0) { + // Original implementation without retry + const command = ['tar', 'xf', '-', '-C', tgtPath]; + const readStream = tar.pack(srcPath); + const errStream = new WritableStreamBuffer(); + await this.execInstance.exec( + namespace, + podName, + containerName, + command, + null, + errStream, + readStream, + false, + async () => { + if (errStream.size()) { + throw new Error( + `Error from cpToPod - details: \n ${errStream.getContentsAsString()}`, + ); + } + }, + ); + } else { + // Implementation with retry + let retries = 0; + let lastError: Error | null = null; + + while (maxTries < 0 || retries <= maxTries) { + try { + const command = ['tar', 'xf', '-', '-C', tgtPath]; + const readStream = tar.pack(srcPath); + const errStream = new WritableStreamBuffer(); + + await this.execInstance.exec( + namespace, + podName, + containerName, + command, + null, + errStream, + readStream, + false, + async () => { + if (errStream.size()) { + throw new Error( + `Error from cpToPod - details: \n ${errStream.getContentsAsString()}`, + ); + } + }, + ); + // Success - exit the retry loop + return; + } catch (err) { + lastError = err as Error; + if (maxTries < 0 || retries < maxTries) { + retries++; + console.error( + `Retrying cpToPod, attempt ${retries}/${maxTries < 0 ? '∞' : maxTries}: ${lastError.message}`, + ); + } else { + break; + } } - }, - ); + } + + throw new Error(`cpToPod failed after ${retries} retries: ${lastError?.message}`); + } } } diff --git a/src/cp_test.ts b/src/cp_test.ts index fb38247df3..cfa32b0583 100644 --- a/src/cp_test.ts +++ b/src/cp_test.ts @@ -106,5 +106,43 @@ describe('Cp', () => { await cp.cpToPod(namespace, pod, container, srcPath, tgtPath); verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called(); }); + + it('should run extract tar command with retries parameter', async () => { + const kc = new KubeConfig(); + const fakeWebSocketInterface: WebSocketInterface = mock(WebSocketHandler); + const fakeWebSocket: WebSocket.WebSocket = mock(WebSocket) as WebSocket.WebSocket; + const callAwaiter: CallAwaiter = new CallAwaiter(); + const exec = new Exec(kc, instance(fakeWebSocketInterface)); + const cp = new Cp(kc, exec); + + const namespace = 'somenamespace'; + const pod = 'somepod'; + const container = 'container'; + const srcPath = 'testdata/archive.txt'; + const tgtPath = '/'; + const maxTries = 3; + const cmdArray = ['tar', 'xf', '-', '-C', tgtPath]; + const path = `/api/v1/namespaces/${namespace}/pods/${pod}/exec`; + + const query = { + stdout: false, + stderr: true, + stdin: true, + tty: false, + command: cmdArray, + container, + }; + const queryStr = querystring.stringify(query); + + const fakeConn: WebSocket.WebSocket = instance(fakeWebSocket); + when(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).thenResolve( + fakeConn, + ); + when(fakeWebSocket.send(anything())).thenCall(callAwaiter.resolveCall('send')); + when(fakeWebSocket.close()).thenCall(callAwaiter.resolveCall('close')); + + await cp.cpToPod(namespace, pod, container, srcPath, tgtPath, maxTries); + verify(fakeWebSocketInterface.connect(`${path}?${queryStr}`, null, anyFunction())).called(); + }); }); });