Merge pull request #580 from calzoneman/partitioning

Implement sharding of channels across multiple instances
This commit is contained in:
Calvin Montgomery 2016-06-18 00:13:28 -07:00 committed by GitHub
commit e4decbc34f
14 changed files with 446 additions and 55 deletions

View File

@ -73,5 +73,7 @@ function handleLine(line) {
Logger.syslog.log("Switch " + args[0] + " is now " +
(Switches.isActive(args[0]) ? "ON" : "OFF"));
}
} else if (line.indexOf("/reload-partitions") === 0) {
sv.reloadPartitionMap();
}
}

View File

@ -2,7 +2,7 @@
"author": "Calvin Montgomery",
"name": "CyTube",
"description": "Online media synchronizer and chat",
"version": "3.16.1",
"version": "3.17.0",
"repository": {
"url": "http://github.com/calzoneman/sync"
},
@ -44,9 +44,9 @@
},
"scripts": {
"build-player": "$npm_node_execpath build-player.js",
"build-server": "babel --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/",
"build-server": "babel -D --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/",
"postinstall": "./postinstall.sh",
"server-dev": "babel --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/"
"server-dev": "babel -D --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/"
},
"devDependencies": {
"coffee-script": "^1.9.2"

View File

@ -0,0 +1,14 @@
import Promise from 'bluebird';
class PartitionClusterClient {
constructor(partitionDecider) {
this.partitionDecider = partitionDecider;
}
getSocketConfig(channel) {
return Promise.resolve(
this.partitionDecider.getPartitionForChannel(channel));
}
}
export { PartitionClusterClient };

View File

@ -0,0 +1,66 @@
import Promise from 'bluebird';
import uuid from 'uuid';
import { runLuaScript } from 'cytube-common/lib/redis/lualoader';
import path from 'path';
import Logger from '../logger';
var SERVER = null;
const CHANNEL_INDEX = 'publicChannelList';
const CACHE_REFRESH_INTERVAL = 30 * 1000;
const CACHE_EXPIRE_DELAY = 40 * 1000;
const READ_CHANNEL_LIST = path.join(__dirname, 'read_channel_list.lua')
class PartitionChannelIndex {
constructor(redisClient) {
this.redisClient = redisClient;
this.uid = uuid.v4();
this.cachedList = [];
this.redisClient.on('error', error => {
Logger.errlog.log(`Redis error: ${error}`);
});
process.nextTick(() => {
SERVER = require('../server').getServer();
this.refreshCache();
setInterval(this.refreshCache.bind(this), CACHE_REFRESH_INTERVAL);
});
}
refreshCache() {
this.publishLocalChannels();
runLuaScript(this.redisClient, READ_CHANNEL_LIST, [
0,
Date.now() - CACHE_EXPIRE_DELAY
]).then(result => {
this.cachedList = JSON.parse(result);
}).catch(error => {
Logger.errlog.log(`Failed to refresh channel list: ${error.stack}`);
});
}
publishLocalChannels() {
const channels = SERVER.packChannelList(true).map(channel => {
return {
name: channel.name,
mediatitle: channel.mediatitle,
pagetitle: channel.pagetitle,
usercount: channel.usercount
};
});
const entry = JSON.stringify({
timestamp: Date.now(),
channels
});
this.redisClient.hsetAsync(CHANNEL_INDEX, this.uid, entry).catch(error => {
Logger.errlog.log(`Failed to publish local channel list: ${error.stack}`);
});
}
listPublicChannels() {
return Promise.resolve(this.cachedList);
}
}
export { PartitionChannelIndex };

View File

@ -0,0 +1,27 @@
class PartitionConfig {
constructor(config) {
this.config = config;
}
getPartitionMap() {
return this.config.partitions;
}
getOverrideMap() {
return this.config.overrides;
}
getPool() {
return this.config.pool;
}
getIdentity() {
return this.config.identity;
}
getRedisConfig() {
return this.config.redis;
}
}
export { PartitionConfig };

View File

@ -0,0 +1,30 @@
import { murmurHash1 } from '../util/murmur';
class PartitionDecider {
constructor(config) {
this.config = config;
}
getPartitionForChannel(channel) {
const partitionMap = this.config.getPartitionMap();
return partitionMap[this.getPartitionIdentityForChannel(channel)];
}
getPartitionIdentityForChannel(channel) {
const overrideMap = this.config.getOverrideMap();
if (overrideMap.hasOwnProperty(channel)) {
return overrideMap[channel];
} else {
const pool = this.config.getPool();
const i = murmurHash1(channel) % pool.length;
return pool[i];
}
}
isChannelOnThisPartition(channel) {
return this.getPartitionIdentityForChannel(channel) ===
this.config.getIdentity();
}
}
export { PartitionDecider };

View File

@ -0,0 +1,74 @@
import { loadFromToml } from 'cytube-common/lib/configuration/configloader';
import { PartitionConfig } from './partitionconfig';
import { PartitionDecider } from './partitiondecider';
import { PartitionClusterClient } from '../io/cluster/partitionclusterclient';
import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider';
import logger from 'cytube-common/lib/logger';
import LegacyConfig from '../config';
import path from 'path';
const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf',
'partitions.toml');
class PartitionModule {
constructor() {
this.initConfig();
}
onReady() {
}
initConfig() {
logger.initialize(null, null, LegacyConfig.get('debug'));
try {
this.partitionConfig = this.loadPartitionMap();
} catch (error) {
process.exit(1);
}
}
loadPartitionMap() {
try {
return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH);
} catch (error) {
if (typeof error.line !== 'undefined') {
logger.error(`Error in ${PARTITION_CONFIG_PATH}: ${error} ` +
`(line ${error.line})`);
} else {
logger.error(`Error loading ${PARTITION_CONFIG_PATH}: ` +
`${error.stack}`);
}
throw error;
}
}
getPartitionDecider() {
if (!this.partitionDecider) {
this.partitionDecider = new PartitionDecider(this.partitionConfig);
}
return this.partitionDecider;
}
getClusterClient() {
if (!this.partitionClusterClient) {
this.partitionClusterClient = new PartitionClusterClient(
this.getPartitionDecider());
}
return this.partitionClusterClient;
}
getRedisClientProvider() {
if (!this.redisClientProvider) {
this.redisClientProvider = new RedisClientProvider(
this.partitionConfig.getRedisConfig()
);
}
return this.redisClientProvider;
}
}
export { PartitionModule };

View File

@ -0,0 +1,30 @@
local entries = redis.call('hgetall', 'publicChannelList')
if #entries == 0 then
return '[]'
end
local channelList = {}
-- ARGV[1] holds the expiration timestamp. Anything older than this
-- will be discarded.
local expiration = tonumber(ARGV[1])
for i = 1, #entries, 2 do
local uid = entries[i]
local entry = cjson.decode(entries[i+1])
local timestamp = tonumber(entry['timestamp'])
if timestamp < expiration then
redis.call('hdel', 'publicChannelList', uid)
else
local channels = entry['channels']
for j = 1, #channels do
channelList[#channelList+1] = channels[j]
end
end
end
-- Necessary to check for this condition because
-- if the table is empty, cjson will encode it as an object ('{}')
if #channelList == 0 then
return '[]'
else
return cjson.encode(channelList)
end

View File

@ -43,11 +43,13 @@ var db = require("./database");
var Flags = require("./flags");
var sio = require("socket.io");
import LocalChannelIndex from './web/localchannelindex';
import { PartitionChannelIndex } from './partition/partitionchannelindex';
import IOConfiguration from './configuration/ioconfig';
import WebConfiguration from './configuration/webconfig';
import NullClusterClient from './io/cluster/nullclusterclient';
import session from './session';
import { LegacyModule } from './legacymodule';
import { PartitionModule } from './partition/partitionmodule';
import * as Switches from './switches';
var Server = function () {
@ -67,9 +69,12 @@ var Server = function () {
Switches.setActive(Switches.DUAL_BACKEND, true);
}
const BackendModule = require('./backend/backendmodule').BackendModule;
initModule = new BackendModule();
initModule = this.initModule = new BackendModule();
} else if (Config.get('enable-partition')) {
initModule = this.initModule = new PartitionModule();
self.partitionDecider = initModule.getPartitionDecider();
} else {
initModule = new LegacyModule();
initModule = this.initModule = new LegacyModule();
}
// database init ------------------------------------------------------
@ -82,7 +87,14 @@ var Server = function () {
const ioConfig = IOConfiguration.fromOldConfig(Config);
const webConfig = WebConfiguration.fromOldConfig(Config);
const clusterClient = initModule.getClusterClient();
const channelIndex = new LocalChannelIndex();
var channelIndex;
if (Config.get("enable-partition")) {
channelIndex = new PartitionChannelIndex(
initModule.getRedisClientProvider().get()
);
} else {
channelIndex = new LocalChannelIndex();
}
self.express = express();
require("./web/webserver").init(self.express,
webConfig,
@ -183,8 +195,15 @@ Server.prototype.isChannelLoaded = function (name) {
};
Server.prototype.getChannel = function (name) {
var self = this;
var cname = name.toLowerCase();
if (this.partitionDecider &&
!this.partitionDecider.isChannelOnThisPartition(cname)) {
const error = new Error(`Channel '${cname}' is mapped to a different partition`);
error.code = 'EWRONGPART';
throw error;
}
var self = this;
for (var i = 0; i < self.channels.length; i++) {
if (self.channels[i].uniqueName === cname)
return self.channels[i];
@ -291,3 +310,43 @@ Server.prototype.shutdown = function () {
process.exit(1);
});
};
Server.prototype.reloadPartitionMap = function () {
if (!Config.get("enable-partition")) {
return;
}
var config;
try {
config = this.initModule.loadPartitionMap();
} catch (error) {
return;
}
this.initModule.partitionConfig.config = config.config;
const channels = Array.prototype.slice.call(this.channels);
Promise.reduce(channels, (_, channel) => {
if (channel.dead) {
return;
}
if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) {
Logger.syslog.log("Partition changed for " + channel.uniqueName);
return channel.saveState().then(() => {
channel.broadcastAll("partitionChange",
this.partitionDecider.getPartitionForChannel(channel.uniqueName));
const users = Array.prototype.slice.call(channel.users);
users.forEach(u => {
try {
u.socket.disconnect();
} catch (error) {
}
});
this.unloadChannel(channel);
});
}
}, 0).then(() => {
Logger.syslog.log("Partition reload complete");
});
};

View File

@ -53,7 +53,20 @@ function User(socket) {
}
self.waitFlag(Flags.U_READY, function () {
var chan = Server.getServer().getChannel(data.name);
var chan;
try {
chan = Server.getServer().getChannel(data.name);
} catch (error) {
if (error.code !== 'EWRONGPART') {
throw error;
}
self.socket.emit("errorMsg", {
msg: "Channel '" + data.name + "' is hosted on another server. " +
"Try refreshing the page to update the connection URL."
});
return;
}
chan.joinUser(self, data);
});
});

35
src/util/murmur.js Normal file
View File

@ -0,0 +1,35 @@
const SEED = 0x1234;
const M = 0xc6a4a793;
const R = 16;
export function murmurHash1(str) {
const buffer = new Buffer(str, 'utf8');
var length = buffer.length;
var h = SEED ^ (length * M);
while (length >= 4) {
var k = buffer.readUInt32LE(buffer.length - length);
h += k;
h *= M;
h ^= h >> 16;
length -= 4;
}
switch (length) {
case 3:
h += buffer[buffer.length - 3] >> 16;
case 2:
h += buffer[buffer.length - 2] >> 8;
case 1:
h += buffer[buffer.length - 1];
h *= M;
h ^= h >> R;
}
h *= M;
h ^= h >> 10;
h *= M;
h ^= h >> 17;
return h;
}

View File

@ -236,7 +236,7 @@ html(lang="en")
#pmbar
include footer
mixin footer()
script(src=sioSource)
script(id="socketio-js", src=sioSource)
script(src="/js/data.js")
script(src="/js/util.js")
script(src="/js/player.js")

View File

@ -6,7 +6,9 @@ Callbacks = {
/* fired when socket connection completes */
connect: function() {
socket.emit("initChannelCallbacks");
HAS_CONNECTED_BEFORE = true;
SOCKETIO_CONNECT_ERROR_COUNT = 0;
$("#socketio-connect-error").remove();
socket.emit("joinChannel", {
name: CHANNEL.name
});
@ -1029,6 +1031,13 @@ Callbacks = {
"unneeded playlist items, filters, and/or emotes. Changes to the channel " +
"will not be saved until the size is reduced to under the limit.")
.attr("id", "chandumptoobig");
},
partitionChange: function (socketConfig) {
window.socket.disconnect();
HAS_CONNECTED_BEFORE = false;
ioServerConnect(socketConfig);
setupCallbacks();
}
}
@ -1050,14 +1059,84 @@ setupCallbacks = function() {
});
})(key);
}
socket.on("connect_error", function (error) {
// If the socket has connected at least once during this
// session and now gets a connect error, it is likely because
// the server is down temporarily and not because of any configuration
// issue. Therefore, omit the warning message about refreshing.
if (HAS_CONNECTED_BEFORE) {
return;
}
SOCKETIO_CONNECT_ERROR_COUNT++;
if (SOCKETIO_CONNECT_ERROR_COUNT >= 3 &&
$("#socketio-connect-error").length === 0) {
var message = "Failed to connect to the server. Try clearing your " +
"cache and refreshing the page.";
makeAlert("Error", message, "alert-danger")
.attr("id", "socketio-connect-error")
.appendTo($("#announcements"));
}
});
};
function ioServerConnect(socketConfig) {
if (socketConfig.error) {
makeAlert("Error", "Socket.io configuration returned error: " +
socketConfig.error, "alert-danger")
.appendTo($("#announcements"));
return;
}
var servers;
if (socketConfig.alt && socketConfig.alt.length > 0 &&
localStorage.useAltServer === "true") {
servers = socketConfig.alt;
console.log("Using alt servers: " + JSON.stringify(servers));
} else {
servers = socketConfig.servers;
}
var chosenServer = null;
servers.forEach(function (server) {
if (chosenServer === null) {
chosenServer = server;
} else if (server.secure && !chosenServer.secure) {
chosenServer = server;
} else if (!server.ipv6Only && chosenServer.ipv6Only) {
chosenServer = server;
}
});
console.log("Connecting to " + JSON.stringify(chosenServer));
if (chosenServer === null) {
makeAlert("Error",
"Socket.io configuration was unable to find a suitable server",
"alert-danger")
.appendTo($("#announcements"));
}
var opts = {
secure: chosenServer.secure
};
window.socket = io(chosenServer.url, opts);
}
(function () {
if (typeof io === "undefined") {
makeAlert("Uh oh!", "It appears the socket.io connection " +
"has failed. If this error persists, a firewall or " +
"antivirus is likely blocking the connection, or the " +
"server is down.", "alert-danger")
var script = document.getElementById("socketio-js");
var source = "unknown";
if (script) {
source = script.src;
}
var message = "The socket.io library could not be loaded from <code>" +
source + "</code>. Ensure that it is not being blocked " +
"by a script blocking extension or firewall and try again.";
makeAlert("Error", message, "alert-danger")
.appendTo($("#announcements"));
Callbacks.disconnect();
return;
@ -1065,47 +1144,7 @@ setupCallbacks = function() {
$.getJSON("/socketconfig/" + CHANNEL.name + ".json")
.done(function (socketConfig) {
if (socketConfig.error) {
makeAlert("Error", "Socket.io configuration returned error: " +
socketConfig.error, "alert-danger")
.appendTo($("#announcements"));
return;
}
var servers;
if (socketConfig.alt && socketConfig.alt.length > 0 &&
localStorage.useAltServer === "true") {
servers = socketConfig.alt;
console.log("Using alt servers: " + JSON.stringify(servers));
} else {
servers = socketConfig.servers;
}
var chosenServer = null;
servers.forEach(function (server) {
if (chosenServer === null) {
chosenServer = server;
} else if (server.secure && !chosenServer.secure) {
chosenServer = server;
} else if (!server.ipv6Only && chosenServer.ipv6Only) {
chosenServer = server;
}
});
console.log("Connecting to " + JSON.stringify(chosenServer));
if (chosenServer === null) {
makeAlert("Error",
"Socket.io configuration was unable to find a suitable server",
"alert-danger")
.appendTo($("#announcements"));
}
var opts = {
secure: chosenServer.secure
};
socket = io(chosenServer.url, opts);
ioServerConnect(socketConfig);
setupCallbacks();
}).fail(function () {
makeAlert("Error", "Failed to retrieve socket.io configuration",

View File

@ -65,6 +65,8 @@ var PL_WAIT_SCROLL = false;
var FILTER_FROM = 0;
var FILTER_TO = 0;
var NO_STORAGE = typeof localStorage == "undefined" || localStorage === null;
var SOCKETIO_CONNECT_ERROR_COUNT = 0;
var HAS_CONNECTED_BEFORE = false;
function getOpt(k) {
var v = NO_STORAGE ? readCookie(k) : localStorage.getItem(k);