diff --git a/src/watch.ts b/src/watch.ts index 85058117e2e..130b0b20395 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -44,7 +44,6 @@ export class Watch { const signal = AbortSignal.any([controller.signal, timeoutSignal]); const ctx = new RequestContext(watchURL.toString(), HttpMethod.GET); - await this.config.applySecurityAuthentication(ctx); let doneCalled: boolean = false; const doneCallOnce = (err: any) => { @@ -59,45 +58,70 @@ export class Watch { } }; - try { - const response = await fetch(watchURL, { - method: 'GET', - headers: ctx.getHeaders(), - dispatcher: ctx.getDispatcher(), - signal, - }); - - if (response.status === 200) { - const body = Readable.fromWeb(response.body! as any); + const startWatch = async (): Promise => { + const abortError = new DOMException('This operation was aborted', 'AbortError'); + const onAbort = () => { + doneCallOnce(abortError); + }; + let abortListenerAdded = false; + if (signal.aborted) { + onAbort(); + return; + } + try { + signal.addEventListener('abort', onAbort); + abortListenerAdded = true; - body.on('error', doneCallOnce); - body.on('close', () => doneCallOnce(null)); - body.on('finish', () => doneCallOnce(null)); + await this.config.applySecurityAuthentication(ctx); + if (signal.aborted) { + // If aborted during authentication, onAbort already dispatched done callback. + return; + } - const lines = createInterface(body); - lines.on('error', doneCallOnce); - lines.on('close', () => doneCallOnce(null)); - lines.on('finish', () => doneCallOnce(null)); - lines.on('line', (line) => { - try { - const data = JSON.parse(line.toString()); - callback(data.type, data.object, data); - } catch { - // ignore parse errors - } + const response = await fetch(watchURL, { + method: 'GET', + headers: ctx.getHeaders(), + dispatcher: ctx.getDispatcher(), + signal, }); - } else { - const statusText = - response.statusText || STATUS_CODES[response.status] || 'Internal Server Error'; - const error = new Error(statusText) as Error & { - statusCode: number | undefined; - }; - error.statusCode = response.status; - throw error; + + if (response.status === 200) { + const body = Readable.fromWeb(response.body! as any); + + body.on('error', doneCallOnce); + body.on('close', () => doneCallOnce(null)); + body.on('finish', () => doneCallOnce(null)); + + const lines = createInterface(body); + lines.on('error', doneCallOnce); + lines.on('close', () => doneCallOnce(null)); + lines.on('finish', () => doneCallOnce(null)); + lines.on('line', (line) => { + try { + const data = JSON.parse(line.toString()); + callback(data.type, data.object, data); + } catch { + // ignore parse errors + } + }); + } else { + const statusText = + response.statusText || STATUS_CODES[response.status] || 'Internal Server Error'; + const error = new Error(statusText) as Error & { + statusCode: number | undefined; + }; + error.statusCode = response.status; + throw error; + } + } catch (err) { + doneCallOnce(err); + } finally { + if (abortListenerAdded) { + signal.removeEventListener('abort', onAbort); + } } - } catch (err) { - doneCallOnce(err); - } + }; + startWatch().catch(doneCallOnce); return controller; } diff --git a/src/watch_test.ts b/src/watch_test.ts index 4aad61a05d9..6f562a09060 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -65,6 +65,10 @@ describe('Watch', () => { let doneCalled = false; let doneErr: any; + let doneResolve!: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); await watch.watch( path, @@ -73,8 +77,10 @@ describe('Watch', () => { (err: any) => { doneCalled = true; doneErr = err; + doneResolve(); }, ); + await donePromise; strictEqual(doneCalled, true); strictEqual(doneErr.toString(), 'Error: Internal Server Error'); mockAgent.assertNoPendingInterceptors(); @@ -170,7 +176,7 @@ describe('Watch', () => { const watch = new Watch(kc); let doneCalled = 0; - let doneResolve: () => void; + let doneResolve!: () => void; const donePromise = new Promise((resolve) => { doneResolve = resolve; @@ -364,7 +370,7 @@ describe('Watch', () => { let doneErr: any; - let doneResolve: () => void; + let doneResolve!: () => void; const donePromise = new Promise((resolve) => { doneResolve = resolve; }); @@ -386,6 +392,91 @@ describe('Watch', () => { strictEqual(doneErr.name, 'TimeoutError'); }); + it('should return abort controller before receiving response data', async (t) => { + const kc = await setupMockSystem(t, (_req: any, _res: any) => { + // Intentionally do not write headers/body so fetch stays pending. + }); + const watch = new Watch(kc); + + let doneErr: any; + + let doneResolve!: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + const controllerPromise = watch.watch( + '/some/path/to/object', + {}, + () => { + throw new Error('Unexpected data received'); + }, + (err: any) => { + doneErr = err; + doneResolve(); + }, + ); + + const controller = await Promise.race([ + controllerPromise, + new Promise((_, reject) => { + setTimeout(() => { + reject(new Error('watch() did not return AbortController in time')); + }, 100); + }), + ]); + + controller.abort(); + await donePromise; + strictEqual(doneErr?.name, 'AbortError'); + }); + + it('should abort before fetch starts when controller is aborted early', async (t) => { + const AUTH_DELAY_MS = 50; + const ASSERT_NO_REQUEST_DELAY_MS = 75; + let requestReceived = false; + const kc = await setupMockSystem(t, () => { + requestReceived = true; + }); + const watch = new Watch(kc); + const originalApplySecurityAuthentication = watch.config.applySecurityAuthentication.bind( + watch.config, + ); + watch.config.applySecurityAuthentication = async (ctx: any) => { + await new Promise((resolve) => { + setTimeout(resolve, AUTH_DELAY_MS); + }); + await originalApplySecurityAuthentication(ctx); + }; + + let doneErr: any; + let doneResolve!: () => void; + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + const controller = await watch.watch( + '/some/path/to/object', + {}, + () => { + throw new Error('Unexpected data received'); + }, + (err: any) => { + doneErr = err; + doneResolve(); + }, + ); + + controller.abort(); + await donePromise; + await new Promise((resolve) => { + // Wait longer than AUTH_DELAY_MS to ensure any incorrect fetch startup would have occurred. + setTimeout(resolve, ASSERT_NO_REQUEST_DELAY_MS); + }); + strictEqual(doneErr?.name, 'AbortError'); + strictEqual(requestReceived, false); + }); + it('should throw on empty config', async () => { const kc = new KubeConfig(); const watch = new Watch(kc);