mirror of https://github.com/calzoneman/sync.git
Add io.throttle-in-rate-limit for socket event rate
This commit is contained in:
parent
db2361aee9
commit
67b1c97d89
|
@ -2,7 +2,7 @@
|
|||
"author": "Calvin Montgomery",
|
||||
"name": "CyTube",
|
||||
"description": "Online media synchronizer and chat",
|
||||
"version": "3.56.4",
|
||||
"version": "3.56.5",
|
||||
"repository": {
|
||||
"url": "http://github.com/calzoneman/sync"
|
||||
},
|
||||
|
|
|
@ -386,6 +386,12 @@ function preprocessConfig(cfg) {
|
|||
return contact.name !== 'calzoneman';
|
||||
});
|
||||
|
||||
if (!cfg.io.throttle) {
|
||||
cfg.io.throttle = {
|
||||
'in-rate-limit': Infinity
|
||||
};
|
||||
}
|
||||
|
||||
return cfg;
|
||||
}
|
||||
|
||||
|
|
|
@ -223,6 +223,8 @@ class IOServer {
|
|||
return;
|
||||
}
|
||||
|
||||
this.setRateLimiter(socket);
|
||||
|
||||
emitMetrics(socket);
|
||||
|
||||
LOGGER.info('Accepted socket from %s', socket.context.ipAddress);
|
||||
|
@ -240,6 +242,25 @@ class IOServer {
|
|||
}
|
||||
}
|
||||
|
||||
setRateLimiter(socket) {
|
||||
const thunk = () => Config.get('io.throttle.in-rate-limit');
|
||||
|
||||
socket._inRateLimit = new TokenBucket(thunk, thunk);
|
||||
|
||||
socket.on('cytube:count-event', () => {
|
||||
if (socket._inRateLimit.throttle()) {
|
||||
LOGGER.warn(
|
||||
'Kicking client %s: exceeded in-rate-limit of %d',
|
||||
socket.context.ipAddress,
|
||||
thunk()
|
||||
);
|
||||
|
||||
socket.emit('kick', { reason: 'Rate limit exceeded' });
|
||||
socket.disconnect();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
initSocketIO() {
|
||||
patchSocketMetrics();
|
||||
patchTypecheckedFunctions();
|
||||
|
@ -277,10 +298,12 @@ const outgoingPacketCount = new Counter({
|
|||
function patchSocketMetrics() {
|
||||
const onevent = Socket.prototype.onevent;
|
||||
const packet = Socket.prototype.packet;
|
||||
const emit = require('events').EventEmitter.prototype.emit;
|
||||
|
||||
Socket.prototype.onevent = function patchedOnevent() {
|
||||
onevent.apply(this, arguments);
|
||||
incomingEventCount.inc(1);
|
||||
emit.call(this, 'cytube:count-event');
|
||||
};
|
||||
|
||||
Socket.prototype.packet = function patchedPacket() {
|
||||
|
|
|
@ -1,16 +1,27 @@
|
|||
class TokenBucket {
|
||||
constructor(capacity, refillRate) {
|
||||
if (typeof refillRate !== 'function') {
|
||||
const _refillRate = refillRate;
|
||||
refillRate = () => _refillRate;
|
||||
}
|
||||
if (typeof capacity !== 'function') {
|
||||
const _capacity = capacity;
|
||||
capacity = () => _capacity;
|
||||
}
|
||||
|
||||
this.capacity = capacity;
|
||||
this.refillRate = refillRate;
|
||||
this.count = capacity;
|
||||
this.count = capacity();
|
||||
this.lastRefill = Date.now();
|
||||
}
|
||||
|
||||
throttle() {
|
||||
const now = Date.now();
|
||||
const delta = Math.floor((now - this.lastRefill) / 1000 * this.refillRate);
|
||||
const delta = Math.floor(
|
||||
(now - this.lastRefill) / 1000 * this.refillRate()
|
||||
);
|
||||
if (delta > 0) {
|
||||
this.count = Math.min(this.capacity, this.count + delta);
|
||||
this.count = Math.min(this.capacity(), this.count + delta);
|
||||
this.lastRefill = now;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue