diff --git a/src/deps.ts b/src/deps.ts index 35b4763..e899f24 100644 --- a/src/deps.ts +++ b/src/deps.ts @@ -71,5 +71,6 @@ export * as cron from 'https://deno.land/x/deno_cron@v1.0.0/cron.ts'; export { S3Client } from 'https://deno.land/x/s3_lite_client@0.6.1/mod.ts'; export { default as IpfsHash } from 'npm:ipfs-only-hash@^4.0.0'; export { default as uuid62 } from 'npm:uuid62@^1.0.2'; +export { Machina } from 'https://gitlab.com/soapbox-pub/nostr-machina/-/raw/08a157d39f2741c9a3a4364cb97db36e71d8c03a/mod.ts'; export type * as TypeFest from 'npm:type-fest@^4.3.0'; diff --git a/src/subscription.ts b/src/subscription.ts index 227c2f4..9492ab2 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,22 +1,19 @@ -import { type Event } from '@/deps.ts'; +import { type Event, Machina } from '@/deps.ts'; import { type DittoFilter, matchDittoFilters } from '@/filter.ts'; import type { EventData } from '@/types.ts'; class Subscription implements AsyncIterable> { filters: DittoFilter[]; - #next?: (event: Event) => void; - #closed = false; + #machina: Machina>; constructor(filters: DittoFilter[]) { this.filters = filters; + this.#machina = new Machina(); } stream(event: Event): void { - if (this.#next) { - this.#next(event); - this.#next = undefined; - } + this.#machina.push(event); } matches(event: Event, data: EventData): boolean { @@ -24,22 +21,11 @@ class Subscription implements AsyncIterable> } close() { - this.#closed = true; - this.#next?.(undefined!); + this.#machina.close(); } - async *[Symbol.asyncIterator]() { - while (true) { - const event = await new Promise>((resolve) => { - this.#next = resolve; - }); - - if (this.#closed) { - return; - } - - yield event; - } + [Symbol.asyncIterator]() { + return this.#machina.stream(); } }