noderl/dist/main.js

125 lines
4.0 KiB
JavaScript

import { Bert } from "./bert.js";
import { Duplex } from "node:stream";
const log = (msg) => process.stderr.write(`${msg}\r\n`);
/**
* Only handles Buffers.
*/
export class DumbPort extends Duplex {
static LEN_LEN = 2;
buf;
constructor() {
super();
this.buf = Buffer.alloc(65_535 + DumbPort.LEN_LEN);
process.stdin.on("readable", () => {
while (this._read())
;
});
}
_read() {
const lenBuf = process.stdin.read(DumbPort.LEN_LEN);
if (lenBuf) {
// Update this if you update LEN_LEN
const len = lenBuf.readUInt16BE(0);
const buf = process.stdin.read(len);
this.push(buf);
return buf;
}
else {
this.push(null);
return null;
}
}
_write(obj, encodingOrCallback, callback) {
const actualCallback = callback || typeof encodingOrCallback === "function" ? encodingOrCallback : undefined;
if (Buffer.isBuffer(obj)) {
this.buf.writeUInt16BE(obj.byteLength);
obj.copy(this.buf, DumbPort.LEN_LEN);
process.stdout.write(this.buf.subarray(0, obj.byteLength + DumbPort.LEN_LEN), actualCallback);
}
else
throw new Error("write was passed non-Buffer");
}
}
/**
* Handles full Erlang/Elixir terms.
*/
export class Port extends Duplex {
bert;
lenBuffer = Buffer.alloc(4);
constructor(allBinariesAsString, mapKeyAsAtom, decodeUndefinedValues) {
super({ objectMode: true });
this.bert = new Bert(allBinariesAsString, mapKeyAsAtom, decodeUndefinedValues);
process.stdin.on("readable", () => {
while (this._read()) { }
});
}
_read() {
const lenBytes = process.stdin.read(4);
if (lenBytes) {
const termLen = this.bert.bytesToInt(lenBytes, 4, true);
log(`Got incoming term length: ${termLen} (bytes: <<${lenBytes.toString('hex').match(/../g).map((hex) => parseInt(`0x${hex}`).toString()).join(', ')}>>).`);
const termBytes = process.stdin.read(termLen);
if (termBytes) {
const decoded = this.bert.decode(termBytes);
this.push(decoded);
return decoded;
}
else {
log("Term read got erroneous null.");
return null;
}
}
else {
this.push(null);
return null;
}
}
_write(obj, encodingOrCallback, callback) {
const actualCallback = callback || typeof encodingOrCallback === "function" ? encodingOrCallback : undefined;
log(`Port was written to.`);
try {
const term = this.bert.encode(obj, false);
this.lenBuffer.writeUInt32BE(term.length);
process.stdout.write(this.lenBuffer);
process.stdout.write(term, actualCallback);
return true;
}
catch (error) {
log(`Error writing: ${error}`);
return false;
}
}
}
export class Server {
port;
handler;
state = undefined;
handleTerm = (term) => {
if (this.state === undefined) {
this.state = term;
}
else {
this.handler(term, (t) => this.port.write(t), this.state, (reply, ...extraArgs) => {
if (reply === "reply")
this.port.write(term);
if (reply === "reply" && extraArgs.length === 2) {
this.state = extraArgs[1];
}
else if (reply === "noreply" && extraArgs.length === 1) {
this.state = extraArgs[0];
}
});
}
};
constructor(port, handler) {
this.port = port;
this.handler = handler;
port.on("readable", () => {
let term;
while (term = port.read()) {
this.handleTerm(term);
}
});
}
}