From da3efaa5bcb1cdecf81df938ad8d516f324e0c44 Mon Sep 17 00:00:00 2001 From: Alex Gleason Date: Tue, 28 Nov 2023 20:55:43 -0600 Subject: [PATCH] fetchWorker: support RequestInit --- src/deps-test.ts | 2 +- src/workers/fetch.test.ts | 21 +++++++++++-- src/workers/fetch.ts | 8 +++-- src/workers/fetch.worker.ts | 10 +++++-- src/workers/handlers/abortsignal.ts | 46 +++++++++++++++++++++++++++++ 5 files changed, 79 insertions(+), 8 deletions(-) create mode 100644 src/workers/handlers/abortsignal.ts diff --git a/src/deps-test.ts b/src/deps-test.ts index 1448854..3e6da88 100644 --- a/src/deps-test.ts +++ b/src/deps-test.ts @@ -1 +1 @@ -export { assert, assertEquals, assertThrows } from 'https://deno.land/std@0.198.0/assert/mod.ts'; +export { assert, assertEquals, assertRejects, assertThrows } from 'https://deno.land/std@0.198.0/assert/mod.ts'; diff --git a/src/workers/fetch.test.ts b/src/workers/fetch.test.ts index d5054fe..21fa1de 100644 --- a/src/workers/fetch.test.ts +++ b/src/workers/fetch.test.ts @@ -1,14 +1,31 @@ -import { assert } from '@/deps-test.ts'; +import { assert, assertRejects } from '@/deps-test.ts'; import { fetchWorker } from './fetch.ts'; +await sleep(2000); + Deno.test('fetchWorker', async () => { - await sleep(2000); const response = await fetchWorker('https://example.com'); const text = await response.text(); assert(text.includes('Example Domain')); }); +Deno.test({ + name: 'fetchWorker with AbortSignal', + async fn() { + const controller = new AbortController(); + const signal = controller.signal; + + setTimeout(() => controller.abort(), 100); + assertRejects(() => fetchWorker('http://httpbin.org/delay/10', { signal })); + + await new Promise((resolve) => { + signal.addEventListener('abort', () => resolve(), { once: true }); + }); + }, + sanitizeResources: false, +}); + function sleep(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/src/workers/fetch.ts b/src/workers/fetch.ts index 61a4a4a..87b622a 100644 --- a/src/workers/fetch.ts +++ b/src/workers/fetch.ts @@ -1,5 +1,7 @@ import { Comlink } from '@/deps.ts'; +import './handlers/abortsignal.ts'; + import type { FetchWorker } from './fetch.worker.ts'; const _worker = Comlink.wrap( @@ -9,10 +11,10 @@ const _worker = Comlink.wrap( ), ); -const fetchWorker: typeof fetch = async (input) => { +const fetchWorker: typeof fetch = async (input, init) => { + const { signal, ...rest } = init || {}; const url = input instanceof Request ? input.url : input.toString(); - - const args = await _worker.fetch(url); + const args = await _worker.fetch(url, rest, signal); return new Response(...args); }; diff --git a/src/workers/fetch.worker.ts b/src/workers/fetch.worker.ts index 3c86255..e36be4a 100644 --- a/src/workers/fetch.worker.ts +++ b/src/workers/fetch.worker.ts @@ -1,8 +1,14 @@ import { Comlink } from '@/deps.ts'; +import './handlers/abortsignal.ts'; + export const FetchWorker = { - async fetch(url: string): Promise<[BodyInit, ResponseInit]> { - const response = await fetch(url); + async fetch( + url: string, + init: Omit, + signal: AbortSignal | null | undefined, + ): Promise<[BodyInit, ResponseInit]> { + const response = await fetch(url, { ...init, signal }); return [ await response.text(), { diff --git a/src/workers/handlers/abortsignal.ts b/src/workers/handlers/abortsignal.ts new file mode 100644 index 0000000..c4c6a3e --- /dev/null +++ b/src/workers/handlers/abortsignal.ts @@ -0,0 +1,46 @@ +import { Comlink } from '@/deps.ts'; + +const signalFinalizers = new FinalizationRegistry((port: MessagePort) => { + port.postMessage(null); + port.close(); +}); + +Comlink.transferHandlers.set('abortsignal', { + canHandle(value) { + return value instanceof AbortSignal || value?.constructor?.name === 'AbortSignal'; + }, + serialize(signal) { + if (signal.aborted) { + return [{ aborted: true }]; + } + + const { port1, port2 } = new MessageChannel(); + signal.addEventListener( + 'abort', + () => port1.postMessage({ reason: signal.reason }), + { once: true }, + ); + + signalFinalizers?.register(signal, port1); + + return [{ aborted: false, port: port2 }, [port2]]; + }, + deserialize({ aborted, port }) { + if (aborted || !port) { + return AbortSignal.abort(); + } + + const ctrl = new AbortController(); + + port.addEventListener('message', (ev) => { + if (ev.data && 'reason' in ev.data) { + ctrl.abort(ev.data.reason); + } + port.close(); + }, { once: true }); + + port.start(); + + return ctrl.signal; + }, +} as Comlink.TransferHandler);