import { Bert } from "./bert.js"; import { Duplex } from "node:stream"; const log = (msg) => process.stderr.write(`${msg}\r\n`); /** * Only handles Buffers. */ export class DumbPort extends Duplex { static LEN_LEN = 2; buf; constructor() { super(); this.buf = Buffer.alloc(65_535 + DumbPort.LEN_LEN); process.stdin.on("readable", () => { while (this._read()) ; }); } _read() { const lenBuf = process.stdin.read(DumbPort.LEN_LEN); if (lenBuf) { // Update this if you update LEN_LEN const len = lenBuf.readUInt16BE(0); const buf = process.stdin.read(len); this.push(buf); return buf; } else { this.push(null); return null; } } _write(obj, encodingOrCallback, callback) { const actualCallback = callback || typeof encodingOrCallback === "function" ? encodingOrCallback : undefined; if (Buffer.isBuffer(obj)) { this.buf.writeUInt16BE(obj.byteLength); obj.copy(this.buf, DumbPort.LEN_LEN); process.stdout.write(this.buf.subarray(0, obj.byteLength + DumbPort.LEN_LEN), actualCallback); } else throw new Error("write was passed non-Buffer"); } } /** * Handles full Erlang/Elixir terms. */ export class Port extends Duplex { bert; lenBuffer = Buffer.alloc(4); constructor(allBinariesAsString, mapKeyAsAtom, decodeUndefinedValues) { super({ objectMode: true }); this.bert = new Bert(allBinariesAsString, mapKeyAsAtom, decodeUndefinedValues); process.stdin.on("readable", () => { while (this._read()) { } }); } _read() { const lenBytes = process.stdin.read(4); if (lenBytes) { const termLen = this.bert.bytesToInt(lenBytes, 4, true); log(`Got incoming term length: ${termLen} (bytes: <<${lenBytes.toString('hex').match(/../g).map((hex) => parseInt(`0x${hex}`).toString()).join(', ')}>>).`); const termBytes = process.stdin.read(termLen); if (termBytes) { const decoded = this.bert.decode(termBytes); this.push(decoded); return decoded; } else { log("Term read got erroneous null."); return null; } } else { this.push(null); return null; } } _write(obj, encodingOrCallback, callback) { const actualCallback = callback || typeof encodingOrCallback === "function" ? encodingOrCallback : undefined; log(`Port was written to.`); try { const term = this.bert.encode(obj, false); this.lenBuffer.writeUInt32BE(term.length); process.stdout.write(this.lenBuffer); process.stdout.write(term, actualCallback); return true; } catch (error) { log(`Error writing: ${error}`); return false; } } } export class Server { port; handler; state = undefined; handleTerm = (term) => { if (this.state === undefined) { this.state = term; } else { this.handler(term, (t) => this.port.write(t), this.state, (reply, ...extraArgs) => { if (reply === "reply") this.port.write(term); if (reply === "reply" && extraArgs.length === 2) { this.state = extraArgs[1]; } else if (reply === "noreply" && extraArgs.length === 1) { this.state = extraArgs[0]; } }); } }; constructor(port, handler) { this.port = port; this.handler = handler; port.on("readable", () => { let term; while (term = port.read()) { this.handleTerm(term); } }); } }