diff --git a/src/io/uws.js b/src/io/uws.js index 6485f6a1..44641042 100644 --- a/src/io/uws.js +++ b/src/io/uws.js @@ -1,8 +1,14 @@ import { EventEmitter } from 'events'; import { Multimap } from '../util/multimap'; import clone from 'clone'; +import typecheck from 'json-typecheck'; import uws from 'uws'; +const LOGGER = require('@calzoneman/jsli')('uws'); + +const TYPE_FRAME = 0; +const TYPE_ACK = 1; + const rooms = new Multimap(); class UWSContext { @@ -40,7 +46,16 @@ class UWSWrapper extends EventEmitter { }; this._uwsSocket.on('message', message => { - this._emit.apply(this, this._decode(message)); + try { + this._decode(message); + } catch (error) { + LOGGER.warn( + 'Decode failed (ip=%s): %s', + this.context.ipAddress, + error + ); + this.disconnect(); + } }); this._uwsSocket.on('close', () => { @@ -63,8 +78,17 @@ class UWSWrapper extends EventEmitter { return !this._connected; } - emit(frame, ...args) { - this._uwsSocket.send(encode(frame, args)); + emit(frame, payload) { + try { + this._uwsSocket.send(encode(frame, payload)); + } catch (error) { + LOGGER.error( + 'Emit failed (ip=%s): %s', + this.context.ipAddress, + error.stack + ); + this.disconnect(); + } } join(room) { @@ -78,16 +102,67 @@ class UWSWrapper extends EventEmitter { } typecheckedOn(frame, typeDef, cb) { - this.on(frame, cb); + this.on(frame, (data, ack) => { + typecheck(data, typeDef, (err, data) => { + if (err) { + this.emit('errorMsg', { + msg: 'Unexpected error for message ' + frame + ': ' + + err.message + }); + } else { + cb(data, ack); + } + }); + }); } typecheckedOnce(frame, typeDef, cb) { - this.once(frame, cb); + this.once(frame, (data, ack) => { + typecheck(data, typeDef, (err, data) => { + if (err) { + this.emit('errorMsg', { + msg: 'Unexpected error for message ' + frame + ': ' + + err.message + }); + } else { + cb(data, ack); + } + }); + }); + } + + _ack(ackId, payload) { + this._uwsSocket.send(JSON.stringify({ + type: TYPE_ACK, + ackId, + payload + })); } _decode(message) { - // TODO: handle error and kill clients with protocol violations - return JSON.parse(message); + const { frame, type, ackId, payload } = JSON.parse(message); + + if (type !== TYPE_FRAME) { + LOGGER.warn( + 'Unexpected message type %s from client; dropping', + type + ); + return; + } + + const args = [payload]; + + if (typeof ackId === 'number') { + args.push(payload => { + try { + this._ack(ackId, payload); + } catch (error) { + LOGGER.error('Error in ack callback: %s', error.stack); + } + }); + } + + this._emit(frame, ...args); } } @@ -148,8 +223,12 @@ class UWSServer extends EventEmitter { } } -function encode(frame, args) { - return JSON.stringify([frame].concat(args)); +function encode(frame, payload) { + return JSON.stringify({ + type: TYPE_FRAME, + frame, + payload + }); } function inRoom(room) { diff --git a/test/io/uws.js b/test/io/uws.js index d058ae3e..91c1d20c 100644 --- a/test/io/uws.js +++ b/test/io/uws.js @@ -19,9 +19,14 @@ describe('UWSServer', () => { socket.test = new EventEmitter(); socket.onmessage = message => { - const args = JSON.parse(message.data); - const frame = args.shift(); - socket.test.emit(frame, ...args); + const { type, frame, payload, ackId } = JSON.parse(message.data); + + if (type === 0) { + socket.test.emit(frame, payload); + } else if (type === 1) { + console.log(message.data); + socket.test.emit('ack', ackId, payload); + } }; socket.onerror = e => { throw e; }; @@ -80,4 +85,59 @@ describe('UWSServer', () => { done(); }); }); + + it('receives a normal frame', done => { + server.on('connection', s => { + s.on('test', data => { + assert.deepStrictEqual(data, {foo: 'bar'}); + done(); + }); + }); + + socket = connect(); + socket.onopen = () => { + socket.send(JSON.stringify({ + type: 0, + frame: 'test', + payload: { foo: 'bar' } + })); + }; + }); + + it('sends a normal frame', done => { + server.on('connection', s => { + s.emit('test', { foo: 'bar' }); + }); + + socket = connect(); + socket.test.on('test', data => { + assert.deepStrictEqual(data, { foo: 'bar' }); + done(); + }); + }); + + it('responds with an ack frame', done => { + server.on('connection', s => { + s.on('test', (data, ack) => { + assert.deepStrictEqual(data, {foo: 'bar'}); + ack({ baz: 'quux' }); + }); + }); + + socket = connect(); + socket.onopen = () => { + socket.send(JSON.stringify({ + type: 0, + frame: 'test', + payload: { foo: 'bar' }, + ackId: 1 + })); + + socket.test.on('ack', (ackId, payload) => { + assert.strictEqual(ackId, 1); + assert.deepStrictEqual(payload, { baz: 'quux' }); + done(); + }); + }; + }); }); diff --git a/www/js/ws.js b/www/js/ws.js index 227294dc..f737029c 100644 --- a/www/js/ws.js +++ b/www/js/ws.js @@ -1,4 +1,7 @@ (function () { + var TYPE_FRAME = 0; + var TYPE_ACK = 1; + function WSShim(ws) { this._ws = ws; this._listeners = Object.create(null); @@ -6,6 +9,9 @@ this._ws.onclose = this._onclose.bind(this); this._ws.onmessage = this._onmessage.bind(this); this._ws.onerror = this._onerror.bind(this); + + this._ackId = 0; + this._pendingAcks = Object.create(null); } WSShim.prototype.listeners = function listeners(frame) { @@ -20,20 +26,28 @@ this.listeners(frame).push(callback); }; - WSShim.prototype.emit = function emit(/* args */) { - var args = Array.prototype.slice.call(arguments).filter(function (it) { - // TODO: handle ack - return typeof it !== 'function'; - }); + WSShim.prototype.emit = function emit(frame, payload, ack) { + var message = { + type: TYPE_FRAME, + frame: frame, + payload: payload + }; - this._ws.send(JSON.stringify(args)); + if (ack && typeof ack === 'function') { + message.ackId = ++this._ackId; + this._pendingAcks[message.ackId] = ack; + } + + this._ws.send(JSON.stringify(message)); }; - WSShim.prototype._emit = function _emit(frame /*, args */) { - var args = Array.prototype.slice.call(arguments, 1); - + WSShim.prototype._emit = function _emit(frame, payload) { this.listeners(frame).forEach(function (cb) { - cb.apply(null, args); + try { + cb(payload); + } catch (error) { + console.error('Error in callback for ' + frame + ': ' + error); + } }); }; @@ -43,17 +57,25 @@ }; WSShim.prototype._onmessage = function _onmessage(message) { - var args; - try { - args = JSON.parse(message.data); + var parsed = JSON.parse(message.data); + console.log(parsed); + var type = parsed.type; + var frame = parsed.frame; + var payload = parsed.payload; + var ackId = parsed.ackId; + + if (type === TYPE_ACK && ackId in this._pendingAcks) { + this._pendingAcks[ackId](payload); + delete this._pendingAcks[ackId]; + } else if (type === TYPE_FRAME) { + this._emit(frame, payload); + } } catch (error) { console.error('Unparseable message from server: ' + message); console.error(error.stack); return; } - - this._emit.apply(this, args); }; WSShim.prototype._onerror = function _onerror() {