Handle errors in broadcast emit

This commit is contained in:
Calvin Montgomery 2018-06-21 21:39:08 -07:00
parent 3547a51f2c
commit f0ba3a998a
2 changed files with 75 additions and 30 deletions

View File

@ -27,7 +27,7 @@ class UWSWrapper extends EventEmitter {
this._uwsSocket = socket;
this._joined = new Set();
this._connected = true;
this.disconnected = false;
this.context = new UWSContext({
connection: {
@ -50,7 +50,7 @@ class UWSWrapper extends EventEmitter {
this._decode(message);
} catch (error) {
LOGGER.warn(
'Decode failed (ip=%s): %s',
'Decode failed for client %s: %s',
this.context.ipAddress,
error
);
@ -59,7 +59,7 @@ class UWSWrapper extends EventEmitter {
});
this._uwsSocket.on('close', () => {
this._connected = false;
this.disconnected = true;
for (let room of this._joined) {
rooms.delete(room, this);
@ -68,27 +68,23 @@ class UWSWrapper extends EventEmitter {
this._joined.clear();
this._emit('disconnect');
});
this._uwsSocket.on('error', error => {
// TODO: determine what conditions cause this
LOGGER.error(
'Error for client %s: %s',
this.context.ipAddress,
error.stack
);
});
}
disconnect() {
this._uwsSocket.terminate();
}
get disconnected() {
return !this._connected;
}
emit(frame, payload) {
try {
this._uwsSocket.send(encode(frame, payload));
} catch (error) {
LOGGER.error(
'Emit failed (ip=%s): %s',
this.context.ipAddress,
error.stack
);
this.disconnect();
}
sendSafe(this, encode(frame, payload));
}
join(room) {
@ -132,11 +128,14 @@ class UWSWrapper extends EventEmitter {
}
_ack(ackId, payload) {
this._uwsSocket.send(JSON.stringify({
sendSafe(
this,
JSON.stringify({
type: TYPE_ACK,
ackId,
payload
}));
})
);
}
_decode(message) {
@ -153,13 +152,7 @@ class UWSWrapper extends EventEmitter {
const args = [payload];
if (typeof ackId === 'number') {
args.push(payload => {
try {
this._ack(ackId, payload);
} catch (error) {
LOGGER.error('Error in ack callback: %s', error.stack);
}
});
args.push(payload => this._ack(ackId, payload));
}
this._emit(frame, ...args);
@ -231,13 +224,26 @@ function encode(frame, payload) {
});
}
function sendSafe(socket, message) {
try {
socket._uwsSocket.send(message);
} catch (error) {
LOGGER.error(
'Error sending to client %s: %s',
socket.context.ipAddress,
error.stack
);
socket.disconnect();
}
}
function inRoom(room) {
return {
emit(frame, payload) {
const encoded = encode(frame, payload);
for (let wrapper of rooms.get(room)) {
wrapper._uwsSocket.send(encoded);
sendSafe(wrapper, encoded);
}
}
};

View File

@ -179,4 +179,43 @@ describe('UWSServer', () => {
});
};
});
it('catches errors during socket.emit()', done => {
server.on('connection', s => {
s.join('testroom');
s._uwsSocket.send = () => { throw new Error('well darn'); };
s.emit('test', { foo: 'bar' });
done();
});
socket = connect();
});
it('catches errors during inRoom().emit()', done => {
server.on('connection', s => {
s.join('testroom');
s._uwsSocket.send = () => { throw new Error('well darn'); };
inRoom('testroom').emit('test', { foo: 'bar' });
done();
});
socket = connect();
});
it('sets disconnected = true after a disconnect', done => {
server.on('connection', s => {
assert.strictEqual(s.disconnected, false);
s.on('disconnect', () => {
assert.strictEqual(s.disconnected, true);
done();
});
s.disconnect();
});
socket = connect();
});
});