Add uws middleware

This commit is contained in:
Calvin Montgomery 2018-06-17 20:28:01 -07:00
parent 61b856c2c9
commit d369f1ebe4
5 changed files with 183 additions and 27 deletions

View File

@ -18,6 +18,7 @@ import { Counter, Gauge } from 'prom-client';
import Socket from 'socket.io/lib/socket';
import { TokenBucket } from '../util/token-bucket';
import http from 'http';
import { UWSServer } from './uws';
const LOGGER = require('@calzoneman/jsli')('ioserver');
@ -268,6 +269,19 @@ class IOServer {
io.on('connection', this.handleConnection.bind(this));
}
initUWS() {
const uws = this.uws = new UWSServer();
uws.use(this.ipProxyMiddleware.bind(this));
uws.use(this.ipBanMiddleware.bind(this));
uws.use(this.ipThrottleMiddleware.bind(this));
uws.use(this.cookieParsingMiddleware.bind(this));
uws.use(this.ipSessionCookieMiddleware.bind(this));
uws.use(this.authUserMiddleware.bind(this));
uws.use(this.metricsEmittingMiddleware.bind(this));
uws.on('connection', this.handleConnection.bind(this));
}
bindTo(servers) {
if (!this.io) {
throw new Error('Cannot bind: socket.io has not been initialized yet');
@ -407,6 +421,7 @@ module.exports = {
});
ioServer.initSocketIO();
ioServer.initUWS();
const uniqueListenAddresses = new Set();
const servers = [];

View File

@ -1,17 +1,43 @@
import { EventEmitter } from 'events';
import { Multimap } from '../util/multimap';
import User from '../user';
import clone from 'clone';
import uws from 'uws';
const rooms = new Multimap();
class UWSContext {
constructor(upgradeReq) {
this.upgradeReq = upgradeReq;
this.ipAddress = null;
this.torConnection = null;
this.ipSessionFirstSeen = null;
this.user = null;
}
}
class UWSWrapper extends EventEmitter {
constructor(socket, context) {
constructor(socket) {
super();
this._uwsSocket = socket;
this._joined = new Set();
this._connected = true;
this.context = context;
this.context = new UWSContext({
connection: {
remoteAddress: socket._socket.remoteAddress
},
headers: clone(socket.upgradeReq.headers)
});
// socket.io metrics compatibility
this.client = {
conn: {
on: function(){},
transport: {
name: 'uws'
}
}
};
this._uwsSocket.on('message', message => {
this._emit.apply(this, this._decode(message));
@ -67,6 +93,61 @@ class UWSWrapper extends EventEmitter {
Object.assign(UWSWrapper.prototype, { _emit: EventEmitter.prototype.emit });
class UWSServer extends EventEmitter {
constructor() {
super();
this._server = new uws.Server({ port: 3000, host: '127.0.0.1' });
this._middleware = [];
this._server.on('connection', socket => this._onConnection(socket));
this._server.on('listening', () => this.emit('listening'));
this._server.on('error', e => this.emit('error', e));
}
use(cb) {
this._middleware.push(cb);
}
_onConnection(uwsSocket) {
const socket = new UWSWrapper(uwsSocket);
if (this._middleware.length === 0) {
this._acceptConnection(socket);
return;
}
let i = 0;
const self = this;
function next(error) {
if (error) {
socket.emit('error', error.message);
socket.disconnect();
return;
}
if (i >= self._middleware.length) {
self._acceptConnection(socket);
return;
}
process.nextTick(self._middleware[i], socket, next);
i++;
}
process.nextTick(next, null);
}
_acceptConnection(socket) {
socket.emit('connect');
this.emit('connection', socket);
}
shutdown() {
this._server.close();
}
}
function encode(frame, args) {
return JSON.stringify([frame].concat(args));
}
@ -83,22 +164,5 @@ function inRoom(room) {
};
}
export { UWSWrapper };
export { UWSServer };
exports['in'] = inRoom;
export function init() {
const uws = require('uws');
const server = new uws.Server({ port: 3000 });
server.on('connection', socket => {
const context = {
aliases: [],
ipSessionFirstSeen: new Date(),
torConnection: false,
ipAddress: null
};
const wrap = new UWSWrapper(socket, context);
new User(wrap, '127.0.0.1', null);
});
}

View File

@ -190,7 +190,6 @@ var Server = function () {
});
require("./io/ioserver").init(self, webConfig);
require("./io/uws").init();
// background tasks init ----------------------------------------------
require("./bgtask")(self);

83
test/io/uws.js Normal file
View File

@ -0,0 +1,83 @@
const { EventEmitter } = require('events');
const assert = require('assert');
const { UWSServer } = require('../../lib/io/uws');
const WebSocket = require('uws');
describe('UWSServer', () => {
const endpoint = 'ws://127.0.0.1:3000';
let server;
let socket;
beforeEach(done => {
server = new UWSServer();
server.on('error', e => { throw e; });
server.once('listening', done);
});
function connect() {
let socket = new WebSocket(endpoint);
socket.test = new EventEmitter();
socket.onmessage = message => {
const args = JSON.parse(message.data);
const frame = args.shift();
socket.test.emit(frame, ...args);
};
socket.onerror = e => { throw e; };
return socket;
}
afterEach(() => {
if (socket) socket.terminate();
socket = null;
if (server) server.shutdown();
server = null;
});
it('accepts a connection immediately if there is no middleware', done => {
socket = connect();
socket.test.on('connect', done);
});
it('accepts a connection with middleware', done => {
let m1 = false, m2 = false;
server.use((socket, next) => {
m1 = true;
next();
});
server.use((socket, next) => {
m2 = true;
next();
});
socket = connect();
socket.test.on('connect', () => {
assert(m1);
assert(m2);
done();
});
});
it('rejects a connection with middleware', done => {
let m1 = false, m2 = false;
server.use((socket, next) => {
m1 = true;
next(new Error('broken'));
});
server.use((socket, next) => {
m2 = true;
next();
});
socket = connect();
socket.test.on('connect', () => {
throw new Error('Unexpected connect callback');
});
socket.test.on('error', e => {
assert.strictEqual(e, 'broken');
assert(!m2);
done();
});
});
});

View File

@ -3,7 +3,6 @@
this._ws = ws;
this._listeners = Object.create(null);
this._ws.onopen = this._onopen.bind(this);
this._ws.onclose = this._onclose.bind(this);
this._ws.onmessage = this._onmessage.bind(this);
this._ws.onerror = this._onerror.bind(this);
@ -38,10 +37,6 @@
});
};
WSShim.prototype._onopen = function _onopen() {
this._emit('connect');
};
WSShim.prototype._onclose = function _onclose() {
// TODO: reconnect logic
this._emit('disconnect');