diff --git a/src/io/ioserver.js b/src/io/ioserver.js index 03095b69..390b621d 100644 --- a/src/io/ioserver.js +++ b/src/io/ioserver.js @@ -18,6 +18,7 @@ import { Counter, Gauge } from 'prom-client'; import Socket from 'socket.io/lib/socket'; import { TokenBucket } from '../util/token-bucket'; import http from 'http'; +import { UWSServer } from './uws'; const LOGGER = require('@calzoneman/jsli')('ioserver'); @@ -268,6 +269,19 @@ class IOServer { io.on('connection', this.handleConnection.bind(this)); } + initUWS() { + const uws = this.uws = new UWSServer(); + + uws.use(this.ipProxyMiddleware.bind(this)); + uws.use(this.ipBanMiddleware.bind(this)); + uws.use(this.ipThrottleMiddleware.bind(this)); + uws.use(this.cookieParsingMiddleware.bind(this)); + uws.use(this.ipSessionCookieMiddleware.bind(this)); + uws.use(this.authUserMiddleware.bind(this)); + uws.use(this.metricsEmittingMiddleware.bind(this)); + uws.on('connection', this.handleConnection.bind(this)); + } + bindTo(servers) { if (!this.io) { throw new Error('Cannot bind: socket.io has not been initialized yet'); @@ -407,6 +421,7 @@ module.exports = { }); ioServer.initSocketIO(); + ioServer.initUWS(); const uniqueListenAddresses = new Set(); const servers = []; diff --git a/src/io/uws.js b/src/io/uws.js index 4b4c6f98..6485f6a1 100644 --- a/src/io/uws.js +++ b/src/io/uws.js @@ -1,17 +1,43 @@ import { EventEmitter } from 'events'; import { Multimap } from '../util/multimap'; -import User from '../user'; +import clone from 'clone'; +import uws from 'uws'; const rooms = new Multimap(); +class UWSContext { + constructor(upgradeReq) { + this.upgradeReq = upgradeReq; + this.ipAddress = null; + this.torConnection = null; + this.ipSessionFirstSeen = null; + this.user = null; + } +} + class UWSWrapper extends EventEmitter { - constructor(socket, context) { + constructor(socket) { super(); this._uwsSocket = socket; this._joined = new Set(); this._connected = true; - this.context = context; + + this.context = new UWSContext({ + connection: { + remoteAddress: socket._socket.remoteAddress + }, + headers: clone(socket.upgradeReq.headers) + }); + // socket.io metrics compatibility + this.client = { + conn: { + on: function(){}, + transport: { + name: 'uws' + } + } + }; this._uwsSocket.on('message', message => { this._emit.apply(this, this._decode(message)); @@ -67,6 +93,61 @@ class UWSWrapper extends EventEmitter { Object.assign(UWSWrapper.prototype, { _emit: EventEmitter.prototype.emit }); +class UWSServer extends EventEmitter { + constructor() { + super(); + + this._server = new uws.Server({ port: 3000, host: '127.0.0.1' }); + this._middleware = []; + + this._server.on('connection', socket => this._onConnection(socket)); + this._server.on('listening', () => this.emit('listening')); + this._server.on('error', e => this.emit('error', e)); + } + + use(cb) { + this._middleware.push(cb); + } + + _onConnection(uwsSocket) { + const socket = new UWSWrapper(uwsSocket); + + if (this._middleware.length === 0) { + this._acceptConnection(socket); + return; + } + + let i = 0; + const self = this; + function next(error) { + if (error) { + socket.emit('error', error.message); + socket.disconnect(); + return; + } + + if (i >= self._middleware.length) { + self._acceptConnection(socket); + return; + } + + process.nextTick(self._middleware[i], socket, next); + i++; + } + + process.nextTick(next, null); + } + + _acceptConnection(socket) { + socket.emit('connect'); + this.emit('connection', socket); + } + + shutdown() { + this._server.close(); + } +} + function encode(frame, args) { return JSON.stringify([frame].concat(args)); } @@ -83,22 +164,5 @@ function inRoom(room) { }; } -export { UWSWrapper }; +export { UWSServer }; exports['in'] = inRoom; - -export function init() { - const uws = require('uws'); - - const server = new uws.Server({ port: 3000 }); - - server.on('connection', socket => { - const context = { - aliases: [], - ipSessionFirstSeen: new Date(), - torConnection: false, - ipAddress: null - }; - const wrap = new UWSWrapper(socket, context); - new User(wrap, '127.0.0.1', null); - }); -} diff --git a/src/server.js b/src/server.js index ba256ed8..1ed26111 100644 --- a/src/server.js +++ b/src/server.js @@ -190,7 +190,6 @@ var Server = function () { }); require("./io/ioserver").init(self, webConfig); - require("./io/uws").init(); // background tasks init ---------------------------------------------- require("./bgtask")(self); diff --git a/test/io/uws.js b/test/io/uws.js new file mode 100644 index 00000000..d058ae3e --- /dev/null +++ b/test/io/uws.js @@ -0,0 +1,83 @@ +const { EventEmitter } = require('events'); +const assert = require('assert'); +const { UWSServer } = require('../../lib/io/uws'); +const WebSocket = require('uws'); + +describe('UWSServer', () => { + const endpoint = 'ws://127.0.0.1:3000'; + + let server; + let socket; + beforeEach(done => { + server = new UWSServer(); + server.on('error', e => { throw e; }); + server.once('listening', done); + }); + + function connect() { + let socket = new WebSocket(endpoint); + socket.test = new EventEmitter(); + + socket.onmessage = message => { + const args = JSON.parse(message.data); + const frame = args.shift(); + socket.test.emit(frame, ...args); + }; + socket.onerror = e => { throw e; }; + + return socket; + } + + afterEach(() => { + if (socket) socket.terminate(); + socket = null; + if (server) server.shutdown(); + server = null; + }); + + it('accepts a connection immediately if there is no middleware', done => { + socket = connect(); + socket.test.on('connect', done); + }); + + it('accepts a connection with middleware', done => { + let m1 = false, m2 = false; + server.use((socket, next) => { + m1 = true; + next(); + }); + server.use((socket, next) => { + m2 = true; + next(); + }); + + socket = connect(); + socket.test.on('connect', () => { + assert(m1); + assert(m2); + done(); + }); + }); + + it('rejects a connection with middleware', done => { + let m1 = false, m2 = false; + server.use((socket, next) => { + m1 = true; + next(new Error('broken')); + }); + server.use((socket, next) => { + m2 = true; + next(); + }); + + socket = connect(); + socket.test.on('connect', () => { + throw new Error('Unexpected connect callback'); + }); + socket.test.on('error', e => { + assert.strictEqual(e, 'broken'); + assert(!m2); + done(); + }); + }); +}); diff --git a/www/js/ws.js b/www/js/ws.js index 807fee4d..227294dc 100644 --- a/www/js/ws.js +++ b/www/js/ws.js @@ -3,7 +3,6 @@ this._ws = ws; this._listeners = Object.create(null); - this._ws.onopen = this._onopen.bind(this); this._ws.onclose = this._onclose.bind(this); this._ws.onmessage = this._onmessage.bind(this); this._ws.onerror = this._onerror.bind(this); @@ -38,10 +37,6 @@ }); }; - WSShim.prototype._onopen = function _onopen() { - this._emit('connect'); - }; - WSShim.prototype._onclose = function _onclose() { // TODO: reconnect logic this._emit('disconnect');