Merge branch 'fetchworker-abort' into 'main'

fetchWorker: support RequestInit

See merge request soapbox-pub/ditto!63
This commit is contained in:
Alex Gleason 2023-11-29 03:34:45 +00:00
commit 06a2eaf8c3
5 changed files with 79 additions and 8 deletions

View File

@ -1 +1 @@
export { assert, assertEquals, assertThrows } from 'https://deno.land/std@0.198.0/assert/mod.ts'; export { assert, assertEquals, assertRejects, assertThrows } from 'https://deno.land/std@0.198.0/assert/mod.ts';

View File

@ -1,14 +1,31 @@
import { assert } from '@/deps-test.ts'; import { assert, assertRejects } from '@/deps-test.ts';
import { fetchWorker } from './fetch.ts'; import { fetchWorker } from './fetch.ts';
await sleep(2000);
Deno.test('fetchWorker', async () => { Deno.test('fetchWorker', async () => {
await sleep(2000);
const response = await fetchWorker('https://example.com'); const response = await fetchWorker('https://example.com');
const text = await response.text(); const text = await response.text();
assert(text.includes('Example Domain')); assert(text.includes('Example Domain'));
}); });
Deno.test({
name: 'fetchWorker with AbortSignal',
async fn() {
const controller = new AbortController();
const signal = controller.signal;
setTimeout(() => controller.abort(), 100);
assertRejects(() => fetchWorker('http://httpbin.org/delay/10', { signal }));
await new Promise<void>((resolve) => {
signal.addEventListener('abort', () => resolve(), { once: true });
});
},
sanitizeResources: false,
});
function sleep(ms: number) { function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }

View File

@ -1,5 +1,7 @@
import { Comlink } from '@/deps.ts'; import { Comlink } from '@/deps.ts';
import './handlers/abortsignal.ts';
import type { FetchWorker } from './fetch.worker.ts'; import type { FetchWorker } from './fetch.worker.ts';
const _worker = Comlink.wrap<typeof FetchWorker>( const _worker = Comlink.wrap<typeof FetchWorker>(
@ -9,10 +11,10 @@ const _worker = Comlink.wrap<typeof FetchWorker>(
), ),
); );
const fetchWorker: typeof fetch = async (input) => { const fetchWorker: typeof fetch = async (input, init) => {
const { signal, ...rest } = init || {};
const url = input instanceof Request ? input.url : input.toString(); const url = input instanceof Request ? input.url : input.toString();
const args = await _worker.fetch(url, rest, signal);
const args = await _worker.fetch(url);
return new Response(...args); return new Response(...args);
}; };

View File

@ -1,8 +1,14 @@
import { Comlink } from '@/deps.ts'; import { Comlink } from '@/deps.ts';
import './handlers/abortsignal.ts';
export const FetchWorker = { export const FetchWorker = {
async fetch(url: string): Promise<[BodyInit, ResponseInit]> { async fetch(
const response = await fetch(url); url: string,
init: Omit<RequestInit, 'signal'>,
signal: AbortSignal | null | undefined,
): Promise<[BodyInit, ResponseInit]> {
const response = await fetch(url, { ...init, signal });
return [ return [
await response.text(), await response.text(),
{ {

View File

@ -0,0 +1,46 @@
import { Comlink } from '@/deps.ts';
const signalFinalizers = new FinalizationRegistry((port: MessagePort) => {
port.postMessage(null);
port.close();
});
Comlink.transferHandlers.set('abortsignal', {
canHandle(value) {
return value instanceof AbortSignal || value?.constructor?.name === 'AbortSignal';
},
serialize(signal) {
if (signal.aborted) {
return [{ aborted: true }];
}
const { port1, port2 } = new MessageChannel();
signal.addEventListener(
'abort',
() => port1.postMessage({ reason: signal.reason }),
{ once: true },
);
signalFinalizers?.register(signal, port1);
return [{ aborted: false, port: port2 }, [port2]];
},
deserialize({ aborted, port }) {
if (aborted || !port) {
return AbortSignal.abort();
}
const ctrl = new AbortController();
port.addEventListener('message', (ev) => {
if (ev.data && 'reason' in ev.data) {
ctrl.abort(ev.data.reason);
}
port.close();
}, { once: true });
port.start();
return ctrl.signal;
},
} as Comlink.TransferHandler<AbortSignal, { aborted: boolean; port?: MessagePort }>);