Refactor protocol and add acks

This commit is contained in:
Calvin Montgomery 2018-06-18 23:12:00 -07:00
parent d369f1ebe4
commit 86e023d233
3 changed files with 188 additions and 27 deletions

View File

@ -1,8 +1,14 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { Multimap } from '../util/multimap'; import { Multimap } from '../util/multimap';
import clone from 'clone'; import clone from 'clone';
import typecheck from 'json-typecheck';
import uws from 'uws'; import uws from 'uws';
const LOGGER = require('@calzoneman/jsli')('uws');
const TYPE_FRAME = 0;
const TYPE_ACK = 1;
const rooms = new Multimap(); const rooms = new Multimap();
class UWSContext { class UWSContext {
@ -40,7 +46,16 @@ class UWSWrapper extends EventEmitter {
}; };
this._uwsSocket.on('message', message => { this._uwsSocket.on('message', message => {
this._emit.apply(this, this._decode(message)); try {
this._decode(message);
} catch (error) {
LOGGER.warn(
'Decode failed (ip=%s): %s',
this.context.ipAddress,
error
);
this.disconnect();
}
}); });
this._uwsSocket.on('close', () => { this._uwsSocket.on('close', () => {
@ -63,8 +78,17 @@ class UWSWrapper extends EventEmitter {
return !this._connected; return !this._connected;
} }
emit(frame, ...args) { emit(frame, payload) {
this._uwsSocket.send(encode(frame, args)); try {
this._uwsSocket.send(encode(frame, payload));
} catch (error) {
LOGGER.error(
'Emit failed (ip=%s): %s',
this.context.ipAddress,
error.stack
);
this.disconnect();
}
} }
join(room) { join(room) {
@ -78,16 +102,67 @@ class UWSWrapper extends EventEmitter {
} }
typecheckedOn(frame, typeDef, cb) { typecheckedOn(frame, typeDef, cb) {
this.on(frame, cb); this.on(frame, (data, ack) => {
typecheck(data, typeDef, (err, data) => {
if (err) {
this.emit('errorMsg', {
msg: 'Unexpected error for message ' + frame + ': ' +
err.message
});
} else {
cb(data, ack);
}
});
});
} }
typecheckedOnce(frame, typeDef, cb) { typecheckedOnce(frame, typeDef, cb) {
this.once(frame, cb); this.once(frame, (data, ack) => {
typecheck(data, typeDef, (err, data) => {
if (err) {
this.emit('errorMsg', {
msg: 'Unexpected error for message ' + frame + ': ' +
err.message
});
} else {
cb(data, ack);
}
});
});
}
_ack(ackId, payload) {
this._uwsSocket.send(JSON.stringify({
type: TYPE_ACK,
ackId,
payload
}));
} }
_decode(message) { _decode(message) {
// TODO: handle error and kill clients with protocol violations const { frame, type, ackId, payload } = JSON.parse(message);
return JSON.parse(message);
if (type !== TYPE_FRAME) {
LOGGER.warn(
'Unexpected message type %s from client; dropping',
type
);
return;
}
const args = [payload];
if (typeof ackId === 'number') {
args.push(payload => {
try {
this._ack(ackId, payload);
} catch (error) {
LOGGER.error('Error in ack callback: %s', error.stack);
}
});
}
this._emit(frame, ...args);
} }
} }
@ -148,8 +223,12 @@ class UWSServer extends EventEmitter {
} }
} }
function encode(frame, args) { function encode(frame, payload) {
return JSON.stringify([frame].concat(args)); return JSON.stringify({
type: TYPE_FRAME,
frame,
payload
});
} }
function inRoom(room) { function inRoom(room) {

View File

@ -19,9 +19,14 @@ describe('UWSServer', () => {
socket.test = new EventEmitter(); socket.test = new EventEmitter();
socket.onmessage = message => { socket.onmessage = message => {
const args = JSON.parse(message.data); const { type, frame, payload, ackId } = JSON.parse(message.data);
const frame = args.shift();
socket.test.emit(frame, ...args); if (type === 0) {
socket.test.emit(frame, payload);
} else if (type === 1) {
console.log(message.data);
socket.test.emit('ack', ackId, payload);
}
}; };
socket.onerror = e => { throw e; }; socket.onerror = e => { throw e; };
@ -80,4 +85,59 @@ describe('UWSServer', () => {
done(); done();
}); });
}); });
it('receives a normal frame', done => {
server.on('connection', s => {
s.on('test', data => {
assert.deepStrictEqual(data, {foo: 'bar'});
done();
});
});
socket = connect();
socket.onopen = () => {
socket.send(JSON.stringify({
type: 0,
frame: 'test',
payload: { foo: 'bar' }
}));
};
});
it('sends a normal frame', done => {
server.on('connection', s => {
s.emit('test', { foo: 'bar' });
});
socket = connect();
socket.test.on('test', data => {
assert.deepStrictEqual(data, { foo: 'bar' });
done();
});
});
it('responds with an ack frame', done => {
server.on('connection', s => {
s.on('test', (data, ack) => {
assert.deepStrictEqual(data, {foo: 'bar'});
ack({ baz: 'quux' });
});
});
socket = connect();
socket.onopen = () => {
socket.send(JSON.stringify({
type: 0,
frame: 'test',
payload: { foo: 'bar' },
ackId: 1
}));
socket.test.on('ack', (ackId, payload) => {
assert.strictEqual(ackId, 1);
assert.deepStrictEqual(payload, { baz: 'quux' });
done();
});
};
});
}); });

View File

@ -1,4 +1,7 @@
(function () { (function () {
var TYPE_FRAME = 0;
var TYPE_ACK = 1;
function WSShim(ws) { function WSShim(ws) {
this._ws = ws; this._ws = ws;
this._listeners = Object.create(null); this._listeners = Object.create(null);
@ -6,6 +9,9 @@
this._ws.onclose = this._onclose.bind(this); this._ws.onclose = this._onclose.bind(this);
this._ws.onmessage = this._onmessage.bind(this); this._ws.onmessage = this._onmessage.bind(this);
this._ws.onerror = this._onerror.bind(this); this._ws.onerror = this._onerror.bind(this);
this._ackId = 0;
this._pendingAcks = Object.create(null);
} }
WSShim.prototype.listeners = function listeners(frame) { WSShim.prototype.listeners = function listeners(frame) {
@ -20,20 +26,28 @@
this.listeners(frame).push(callback); this.listeners(frame).push(callback);
}; };
WSShim.prototype.emit = function emit(/* args */) { WSShim.prototype.emit = function emit(frame, payload, ack) {
var args = Array.prototype.slice.call(arguments).filter(function (it) { var message = {
// TODO: handle ack type: TYPE_FRAME,
return typeof it !== 'function'; frame: frame,
}); payload: payload
};
this._ws.send(JSON.stringify(args)); if (ack && typeof ack === 'function') {
message.ackId = ++this._ackId;
this._pendingAcks[message.ackId] = ack;
}
this._ws.send(JSON.stringify(message));
}; };
WSShim.prototype._emit = function _emit(frame /*, args */) { WSShim.prototype._emit = function _emit(frame, payload) {
var args = Array.prototype.slice.call(arguments, 1);
this.listeners(frame).forEach(function (cb) { this.listeners(frame).forEach(function (cb) {
cb.apply(null, args); try {
cb(payload);
} catch (error) {
console.error('Error in callback for ' + frame + ': ' + error);
}
}); });
}; };
@ -43,17 +57,25 @@
}; };
WSShim.prototype._onmessage = function _onmessage(message) { WSShim.prototype._onmessage = function _onmessage(message) {
var args;
try { try {
args = JSON.parse(message.data); var parsed = JSON.parse(message.data);
console.log(parsed);
var type = parsed.type;
var frame = parsed.frame;
var payload = parsed.payload;
var ackId = parsed.ackId;
if (type === TYPE_ACK && ackId in this._pendingAcks) {
this._pendingAcks[ackId](payload);
delete this._pendingAcks[ackId];
} else if (type === TYPE_FRAME) {
this._emit(frame, payload);
}
} catch (error) { } catch (error) {
console.error('Unparseable message from server: ' + message); console.error('Unparseable message from server: ' + message);
console.error(error.stack); console.error(error.stack);
return; return;
} }
this._emit.apply(this, args);
}; };
WSShim.prototype._onerror = function _onerror() { WSShim.prototype._onerror = function _onerror() {