From 77465e6b49018c8d9468be2ad42e3075770a1673 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Mon, 6 Jun 2016 21:54:49 -0700 Subject: [PATCH 1/7] Add partitioning logic --- src/io/cluster/partitionclusterclient.js | 15 ++++++ src/partition/partitionconfig.js | 23 +++++++++ src/partition/partitiondecider.js | 29 +++++++++++ src/partition/partitionmodule.js | 63 ++++++++++++++++++++++++ src/server.js | 3 ++ src/util/murmur.js | 35 +++++++++++++ 6 files changed, 168 insertions(+) create mode 100644 src/io/cluster/partitionclusterclient.js create mode 100644 src/partition/partitionconfig.js create mode 100644 src/partition/partitiondecider.js create mode 100644 src/partition/partitionmodule.js create mode 100644 src/util/murmur.js diff --git a/src/io/cluster/partitionclusterclient.js b/src/io/cluster/partitionclusterclient.js new file mode 100644 index 00000000..9d5324ef --- /dev/null +++ b/src/io/cluster/partitionclusterclient.js @@ -0,0 +1,15 @@ +import Promise from 'bluebird'; + +class PartitionClusterClient { + constructor(partitionDecider) { + this.partitionDecider = partitionDecider; + } + + getSocketConfig(channel) { + return Promise.resolve({ + servers: this.partitionDecider.getPartitionForChannel(channel) + }); + } +} + +export { PartitionClusterClient }; diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js new file mode 100644 index 00000000..7684b95e --- /dev/null +++ b/src/partition/partitionconfig.js @@ -0,0 +1,23 @@ +class PartitionConfig { + constructor(config) { + this.config = config; + } + + getPartitionMap() { + return this.config.partitions; + } + + getOverrideMap() { + return this.config.overrides; + } + + getPool() { + return this.config.pool; + } + + getIdentity() { + return this.config.identity; + } +} + +export { PartitionConfig }; diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js new file mode 100644 index 00000000..425d6207 --- /dev/null +++ b/src/partition/partitiondecider.js @@ -0,0 +1,29 @@ +import { murmurHash1 } from '../util/murmur'; + +class PartitionDecider { + constructor(config) { + this.identity = config.getIdentity(); + this.partitionMap = config.getPartitionMap(); + this.pool = config.getPool(); + this.overrideMap = config.getOverrideMap(); + } + + getPartitionForChannel(channel) { + return this.partitionMap[this.getPartitionIdentityForChannel(channel)]; + } + + getPartitionIdentityForChannel(channel) { + if (this.overrideMap.hasOwnProperty(channel)) { + return this.overrideMap[channel]; + } else { + const i = murmurHash1(channel) % this.pool.length; + return this.pool[i]; + } + } + + isChannelOnThisPartition(channel) { + return this.getPartitionIdentityForChannel(channel) === this.identity; + } +} + +export { PartitionDecider }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js new file mode 100644 index 00000000..4c1adb2f --- /dev/null +++ b/src/partition/partitionmodule.js @@ -0,0 +1,63 @@ +import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; +import { PartitionConfig } from './partitionconfig'; +import { PartitionDecider } from './partitiondecider'; +import { PartitionClusterClient } from '../io/cluster/partitionclusterclient'; +import logger from 'cytube-common/lib/logger'; +import LegacyConfig from '../config'; +import path from 'path'; + +const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', + 'partitions.toml'); + +class PartitionModule { + constructor() { + this.initConfig(); + } + + onReady() { + + } + + initConfig() { + logger.initialize(null, null, LegacyConfig.get('debug')); + try { + this.partitionConfig = this.loadPartitionMap(); + } catch (error) { + process.exit(1); + } + } + + loadPartitionMap() { + try { + return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH); + } catch (error) { + if (typeof error.line !== 'undefined') { + logger.error(`Error in ${PARTITION_CONFIG_PATH}: ${error} ` + + `(line ${error.line})`); + } else { + logger.error(`Error loading ${PARTITION_CONFIG_PATH}: ` + + `${error.stack}`); + } + throw error; + } + } + + getPartitionDecider() { + if (!this.partitionDecider) { + this.partitionDecider = new PartitionDecider(this.partitionConfig); + } + + return this.partitionDecider; + } + + getClusterClient() { + if (!this.partitionClusterClient) { + this.partitionClusterClient = new PartitionClusterClient( + this.getPartitionDecider()); + } + + return this.partitionClusterClient; + } +} + +export { PartitionModule }; diff --git a/src/server.js b/src/server.js index 91ac7a2f..a131e176 100644 --- a/src/server.js +++ b/src/server.js @@ -48,6 +48,7 @@ import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; import session from './session'; import { LegacyModule } from './legacymodule'; +import { PartitionModule } from './partition/partitionmodule'; import * as Switches from './switches'; var Server = function () { @@ -68,6 +69,8 @@ var Server = function () { } const BackendModule = require('./backend/backendmodule').BackendModule; initModule = new BackendModule(); + } else if (Config.get('enable-partition')) { + initModule = new PartitionModule(); } else { initModule = new LegacyModule(); } diff --git a/src/util/murmur.js b/src/util/murmur.js new file mode 100644 index 00000000..60416891 --- /dev/null +++ b/src/util/murmur.js @@ -0,0 +1,35 @@ +const SEED = 0x1234; +const M = 0xc6a4a793; +const R = 16; + +export function murmurHash1(str) { + const buffer = new Buffer(str, 'utf8'); + var length = buffer.length; + var h = SEED ^ (length * M); + + while (length >= 4) { + var k = buffer.readUInt32LE(buffer.length - length); + h += k; + h *= M; + h ^= h >> 16; + length -= 4; + } + + switch (length) { + case 3: + h += buffer[buffer.length - 3] >> 16; + case 2: + h += buffer[buffer.length - 2] >> 8; + case 1: + h += buffer[buffer.length - 1]; + h *= M; + h ^= h >> R; + } + + h *= M; + h ^= h >> 10; + h *= M; + h ^= h >> 17; + + return h; +} From a360cd8808ba5e62c9e888e3036db49990a44846 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Tue, 7 Jun 2016 22:47:49 -0700 Subject: [PATCH 2/7] Reject joins for channels mapped to other partitions --- src/server.js | 10 +++++++++- src/user.js | 15 ++++++++++++++- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/server.js b/src/server.js index a131e176..95f5d3bd 100644 --- a/src/server.js +++ b/src/server.js @@ -71,6 +71,7 @@ var Server = function () { initModule = new BackendModule(); } else if (Config.get('enable-partition')) { initModule = new PartitionModule(); + self.partitionDecider = initModule.getPartitionDecider(); } else { initModule = new LegacyModule(); } @@ -186,8 +187,15 @@ Server.prototype.isChannelLoaded = function (name) { }; Server.prototype.getChannel = function (name) { - var self = this; var cname = name.toLowerCase(); + if (this.partitionDecider && + !this.partitionDecider.isChannelOnThisPartition(cname)) { + const error = new Error(`Channel '${cname}' is mapped to a different partition`); + error.code = 'EWRONGPART'; + throw error; + } + + var self = this; for (var i = 0; i < self.channels.length; i++) { if (self.channels[i].uniqueName === cname) return self.channels[i]; diff --git a/src/user.js b/src/user.js index b779a672..bbb6d7e9 100644 --- a/src/user.js +++ b/src/user.js @@ -53,7 +53,20 @@ function User(socket) { } self.waitFlag(Flags.U_READY, function () { - var chan = Server.getServer().getChannel(data.name); + var chan; + try { + chan = Server.getServer().getChannel(data.name); + } catch (error) { + if (error.code !== 'EWRONGPART') { + throw error; + } + + self.socket.emit("errorMsg", { + msg: "Channel '" + data.name + "' is hosted on another server. " + + "Try refreshing the page to update the connection URL." + }); + return; + } chan.joinUser(self, data); }); }); From 7faf2829b268b77498f4a56b26b6fc4aa58fad14 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Tue, 7 Jun 2016 23:00:50 -0700 Subject: [PATCH 3/7] Improve clientside socket.io connection error reporting --- templates/channel.jade | 2 +- www/js/callbacks.js | 29 ++++++++++++++++++++++++----- www/js/data.js | 1 + 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/templates/channel.jade b/templates/channel.jade index 3c279578..561a0d4c 100644 --- a/templates/channel.jade +++ b/templates/channel.jade @@ -236,7 +236,7 @@ html(lang="en") #pmbar include footer mixin footer() - script(src=sioSource) + script(id="socketio-js", src=sioSource) script(src="/js/data.js") script(src="/js/util.js") script(src="/js/player.js") diff --git a/www/js/callbacks.js b/www/js/callbacks.js index 5c862d50..eb716e41 100644 --- a/www/js/callbacks.js +++ b/www/js/callbacks.js @@ -6,7 +6,8 @@ Callbacks = { /* fired when socket connection completes */ connect: function() { - socket.emit("initChannelCallbacks"); + SOCKETIO_CONNECT_ERROR_COUNT = 0; + $("#socketio-connect-error").remove(); socket.emit("joinChannel", { name: CHANNEL.name }); @@ -1050,14 +1051,32 @@ setupCallbacks = function() { }); })(key); } + + socket.on("connect_error", function (error) { + SOCKETIO_CONNECT_ERROR_COUNT++; + if (SOCKETIO_CONNECT_ERROR_COUNT >= 3 && + $("#socketio-connect-error").length === 0) { + var message = "Failed to connect to the server. Try clearing your " + + "cache and refreshing the page."; + makeAlert("Error", message, "alert-danger") + .attr("id", "socketio-connect-error") + .appendTo($("#announcements")); + } + }); }; (function () { if (typeof io === "undefined") { - makeAlert("Uh oh!", "It appears the socket.io connection " + - "has failed. If this error persists, a firewall or " + - "antivirus is likely blocking the connection, or the " + - "server is down.", "alert-danger") + var script = document.getElementById("socketio-js"); + var source = "unknown"; + if (script) { + source = script.src; + } + + var message = "The socket.io library could not be loaded from " + + source + ". Ensure that it is not being blocked " + + "by a script blocking extension or firewall and try again."; + makeAlert("Error", message, "alert-danger") .appendTo($("#announcements")); Callbacks.disconnect(); return; diff --git a/www/js/data.js b/www/js/data.js index 5dbfacb7..ab9a5fce 100644 --- a/www/js/data.js +++ b/www/js/data.js @@ -65,6 +65,7 @@ var PL_WAIT_SCROLL = false; var FILTER_FROM = 0; var FILTER_TO = 0; var NO_STORAGE = typeof localStorage == "undefined" || localStorage === null; +var SOCKETIO_CONNECT_ERROR_COUNT = 0; function getOpt(k) { var v = NO_STORAGE ? readCookie(k) : localStorage.getItem(k); From 6e772c6837ae2209407cae6d5265638f269fdf3c Mon Sep 17 00:00:00 2001 From: calzoneman Date: Wed, 8 Jun 2016 22:54:16 -0700 Subject: [PATCH 4/7] Add partition map reload --- index.js | 2 + src/io/cluster/partitionclusterclient.js | 5 +- src/partition/partitiondecider.js | 21 +++--- src/server.js | 46 +++++++++++- www/js/callbacks.js | 92 +++++++++++++----------- 5 files changed, 109 insertions(+), 57 deletions(-) diff --git a/index.js b/index.js index fb8e3cc0..729d6cb4 100644 --- a/index.js +++ b/index.js @@ -73,5 +73,7 @@ function handleLine(line) { Logger.syslog.log("Switch " + args[0] + " is now " + (Switches.isActive(args[0]) ? "ON" : "OFF")); } + } else if (line.indexOf("/reload-partitions") === 0) { + sv.reloadPartitionMap(); } } diff --git a/src/io/cluster/partitionclusterclient.js b/src/io/cluster/partitionclusterclient.js index 9d5324ef..13d29ff2 100644 --- a/src/io/cluster/partitionclusterclient.js +++ b/src/io/cluster/partitionclusterclient.js @@ -6,9 +6,8 @@ class PartitionClusterClient { } getSocketConfig(channel) { - return Promise.resolve({ - servers: this.partitionDecider.getPartitionForChannel(channel) - }); + return Promise.resolve( + this.partitionDecider.getPartitionForChannel(channel)); } } diff --git a/src/partition/partitiondecider.js b/src/partition/partitiondecider.js index 425d6207..b9677d95 100644 --- a/src/partition/partitiondecider.js +++ b/src/partition/partitiondecider.js @@ -2,27 +2,28 @@ import { murmurHash1 } from '../util/murmur'; class PartitionDecider { constructor(config) { - this.identity = config.getIdentity(); - this.partitionMap = config.getPartitionMap(); - this.pool = config.getPool(); - this.overrideMap = config.getOverrideMap(); + this.config = config; } getPartitionForChannel(channel) { - return this.partitionMap[this.getPartitionIdentityForChannel(channel)]; + const partitionMap = this.config.getPartitionMap(); + return partitionMap[this.getPartitionIdentityForChannel(channel)]; } getPartitionIdentityForChannel(channel) { - if (this.overrideMap.hasOwnProperty(channel)) { - return this.overrideMap[channel]; + const overrideMap = this.config.getOverrideMap(); + if (overrideMap.hasOwnProperty(channel)) { + return overrideMap[channel]; } else { - const i = murmurHash1(channel) % this.pool.length; - return this.pool[i]; + const pool = this.config.getPool(); + const i = murmurHash1(channel) % pool.length; + return pool[i]; } } isChannelOnThisPartition(channel) { - return this.getPartitionIdentityForChannel(channel) === this.identity; + return this.getPartitionIdentityForChannel(channel) === + this.config.getIdentity(); } } diff --git a/src/server.js b/src/server.js index 95f5d3bd..97e57d52 100644 --- a/src/server.js +++ b/src/server.js @@ -68,12 +68,12 @@ var Server = function () { Switches.setActive(Switches.DUAL_BACKEND, true); } const BackendModule = require('./backend/backendmodule').BackendModule; - initModule = new BackendModule(); + initModule = this.initModule = new BackendModule(); } else if (Config.get('enable-partition')) { - initModule = new PartitionModule(); + initModule = this.initModule = new PartitionModule(); self.partitionDecider = initModule.getPartitionDecider(); } else { - initModule = new LegacyModule(); + initModule = this.initModule = new LegacyModule(); } // database init ------------------------------------------------------ @@ -302,3 +302,43 @@ Server.prototype.shutdown = function () { process.exit(1); }); }; + +Server.prototype.reloadPartitionMap = function () { + if (!Config.get("enable-partition")) { + return; + } + + var config; + try { + config = this.initModule.loadPartitionMap(); + } catch (error) { + return; + } + + this.initModule.partitionConfig.config = config.config; + + const channels = Array.prototype.slice.call(this.channels); + Promise.reduce(channels, (_, channel) => { + if (channel.dead) { + return; + } + + if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) { + Logger.syslog.log("Partition changed for " + channel.uniqueName); + return channel.saveState().then(() => { + channel.broadcastAll("partitionChange", + this.partitionDecider.getPartitionForChannel(channel.uniqueName)); + const users = Array.prototype.slice.call(channel.users); + users.forEach(u => { + try { + u.socket.disconnect(); + } catch (error) { + } + }); + this.unloadChannel(channel); + }); + } + }, 0).then(() => { + Logger.syslog.log("Partition reload complete"); + }); +}; diff --git a/www/js/callbacks.js b/www/js/callbacks.js index eb716e41..0794b40c 100644 --- a/www/js/callbacks.js +++ b/www/js/callbacks.js @@ -1030,6 +1030,12 @@ Callbacks = { "unneeded playlist items, filters, and/or emotes. Changes to the channel " + "will not be saved until the size is reduced to under the limit.") .attr("id", "chandumptoobig"); + }, + + partitionChange: function (socketConfig) { + window.socket.disconnect(); + ioServerConnect(socketConfig); + setupCallbacks(); } } @@ -1065,6 +1071,50 @@ setupCallbacks = function() { }); }; +function ioServerConnect(socketConfig) { + if (socketConfig.error) { + makeAlert("Error", "Socket.io configuration returned error: " + + socketConfig.error, "alert-danger") + .appendTo($("#announcements")); + return; + } + + var servers; + if (socketConfig.alt && socketConfig.alt.length > 0 && + localStorage.useAltServer === "true") { + servers = socketConfig.alt; + console.log("Using alt servers: " + JSON.stringify(servers)); + } else { + servers = socketConfig.servers; + } + + var chosenServer = null; + servers.forEach(function (server) { + if (chosenServer === null) { + chosenServer = server; + } else if (server.secure && !chosenServer.secure) { + chosenServer = server; + } else if (!server.ipv6Only && chosenServer.ipv6Only) { + chosenServer = server; + } + }); + + console.log("Connecting to " + JSON.stringify(chosenServer)); + + if (chosenServer === null) { + makeAlert("Error", + "Socket.io configuration was unable to find a suitable server", + "alert-danger") + .appendTo($("#announcements")); + } + + var opts = { + secure: chosenServer.secure + }; + + window.socket = io(chosenServer.url, opts); +} + (function () { if (typeof io === "undefined") { var script = document.getElementById("socketio-js"); @@ -1084,47 +1134,7 @@ setupCallbacks = function() { $.getJSON("/socketconfig/" + CHANNEL.name + ".json") .done(function (socketConfig) { - if (socketConfig.error) { - makeAlert("Error", "Socket.io configuration returned error: " + - socketConfig.error, "alert-danger") - .appendTo($("#announcements")); - return; - } - - var servers; - if (socketConfig.alt && socketConfig.alt.length > 0 && - localStorage.useAltServer === "true") { - servers = socketConfig.alt; - console.log("Using alt servers: " + JSON.stringify(servers)); - } else { - servers = socketConfig.servers; - } - - var chosenServer = null; - servers.forEach(function (server) { - if (chosenServer === null) { - chosenServer = server; - } else if (server.secure && !chosenServer.secure) { - chosenServer = server; - } else if (!server.ipv6Only && chosenServer.ipv6Only) { - chosenServer = server; - } - }); - - console.log("Connecting to " + JSON.stringify(chosenServer)); - - if (chosenServer === null) { - makeAlert("Error", - "Socket.io configuration was unable to find a suitable server", - "alert-danger") - .appendTo($("#announcements")); - } - - var opts = { - secure: chosenServer.secure - }; - - socket = io(chosenServer.url, opts); + ioServerConnect(socketConfig); setupCallbacks(); }).fail(function () { makeAlert("Error", "Failed to retrieve socket.io configuration", From 5b9948f7090485be182c691283028c61464eea45 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Wed, 8 Jun 2016 22:58:34 -0700 Subject: [PATCH 5/7] Omit the connection warning if the socket connected at least once before --- www/js/callbacks.js | 10 ++++++++++ www/js/data.js | 1 + 2 files changed, 11 insertions(+) diff --git a/www/js/callbacks.js b/www/js/callbacks.js index 0794b40c..caa6da21 100644 --- a/www/js/callbacks.js +++ b/www/js/callbacks.js @@ -6,6 +6,7 @@ Callbacks = { /* fired when socket connection completes */ connect: function() { + HAS_CONNECTED_BEFORE = true; SOCKETIO_CONNECT_ERROR_COUNT = 0; $("#socketio-connect-error").remove(); socket.emit("joinChannel", { @@ -1034,6 +1035,7 @@ Callbacks = { partitionChange: function (socketConfig) { window.socket.disconnect(); + HAS_CONNECTED_BEFORE = false; ioServerConnect(socketConfig); setupCallbacks(); } @@ -1059,6 +1061,14 @@ setupCallbacks = function() { } socket.on("connect_error", function (error) { + // If the socket has connected at least once during this + // session and now gets a connect error, it is likely because + // the server is down temporarily and not because of any configuration + // issue. Therefore, omit the warning message about refreshing. + if (HAS_CONNECTED_BEFORE) { + return; + } + SOCKETIO_CONNECT_ERROR_COUNT++; if (SOCKETIO_CONNECT_ERROR_COUNT >= 3 && $("#socketio-connect-error").length === 0) { diff --git a/www/js/data.js b/www/js/data.js index ab9a5fce..1d348208 100644 --- a/www/js/data.js +++ b/www/js/data.js @@ -66,6 +66,7 @@ var FILTER_FROM = 0; var FILTER_TO = 0; var NO_STORAGE = typeof localStorage == "undefined" || localStorage === null; var SOCKETIO_CONNECT_ERROR_COUNT = 0; +var HAS_CONNECTED_BEFORE = false; function getOpt(k) { var v = NO_STORAGE ? readCookie(k) : localStorage.getItem(k); From b6bb0aa56dfccab6460e54c262352adf40a73bd4 Mon Sep 17 00:00:00 2001 From: calzoneman Date: Thu, 9 Jun 2016 23:42:30 -0700 Subject: [PATCH 6/7] Add redis-based channel index --- package.json | 4 +- src/partition/partitionchannelindex.js | 62 ++++++++++++++++++++++++++ src/partition/partitionconfig.js | 4 ++ src/partition/partitionmodule.js | 11 +++++ src/partition/read_channel_list.lua | 30 +++++++++++++ src/server.js | 10 ++++- 6 files changed, 118 insertions(+), 3 deletions(-) create mode 100644 src/partition/partitionchannelindex.js create mode 100644 src/partition/read_channel_list.lua diff --git a/package.json b/package.json index b0405ab4..44d49b98 100644 --- a/package.json +++ b/package.json @@ -44,9 +44,9 @@ }, "scripts": { "build-player": "$npm_node_execpath build-player.js", - "build-server": "babel --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/", + "build-server": "babel -D --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/", "postinstall": "./postinstall.sh", - "server-dev": "babel --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/" + "server-dev": "babel -D --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/" }, "devDependencies": { "coffee-script": "^1.9.2" diff --git a/src/partition/partitionchannelindex.js b/src/partition/partitionchannelindex.js new file mode 100644 index 00000000..313e9f32 --- /dev/null +++ b/src/partition/partitionchannelindex.js @@ -0,0 +1,62 @@ +import Promise from 'bluebird'; +import uuid from 'uuid'; +import { runLuaScript } from 'cytube-common/lib/redis/lualoader'; +import path from 'path'; +import Logger from '../logger'; + +var SERVER = null; +const CHANNEL_INDEX = 'publicChannelList'; +const CACHE_REFRESH_INTERVAL = 30 * 1000; +const CACHE_EXPIRE_DELAY = 40 * 1000; +const READ_CHANNEL_LIST = path.join(__dirname, 'read_channel_list.lua') + +class PartitionChannelIndex { + constructor(redisClient) { + this.redisClient = redisClient; + this.uid = uuid.v4(); + this.cachedList = []; + process.nextTick(() => { + SERVER = require('../server').getServer(); + this.refreshCache(); + setInterval(this.refreshCache.bind(this), CACHE_REFRESH_INTERVAL); + }); + } + + refreshCache() { + this.publishLocalChannels(); + runLuaScript(this.redisClient, READ_CHANNEL_LIST, [ + 0, + Date.now() - CACHE_EXPIRE_DELAY + ]).then(result => { + this.cachedList = JSON.parse(result); + }).catch(error => { + Logger.errlog.log(`Failed to refresh channel list: ${error.stack}`); + }); + } + + publishLocalChannels() { + const channels = SERVER.packChannelList(true).map(channel => { + return { + name: channel.name, + mediatitle: channel.mediatitle, + pagetitle: channel.pagetitle, + usercount: channel.usercount + }; + }); + + const entry = JSON.stringify({ + timestamp: Date.now(), + channels + }); + + this.redisClient.hsetAsync(CHANNEL_INDEX, this.uid, entry).catch(error => { + Logger.errlog.log(`Failed to publish local channel list: ${error.stack}`); + }); + } + + listPublicChannels() { + return Promise.resolve(this.cachedList); + } +} + +export { PartitionChannelIndex }; diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index 7684b95e..b9ec4cdd 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -18,6 +18,10 @@ class PartitionConfig { getIdentity() { return this.config.identity; } + + getRedisConfig() { + return this.config.redis; + } } export { PartitionConfig }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 4c1adb2f..5a5188eb 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -2,6 +2,7 @@ import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; import { PartitionConfig } from './partitionconfig'; import { PartitionDecider } from './partitiondecider'; import { PartitionClusterClient } from '../io/cluster/partitionclusterclient'; +import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider'; import logger from 'cytube-common/lib/logger'; import LegacyConfig from '../config'; import path from 'path'; @@ -58,6 +59,16 @@ class PartitionModule { return this.partitionClusterClient; } + + getRedisClientProvider() { + if (!this.redisClientProvider) { + this.redisClientProvider = new RedisClientProvider( + this.partitionConfig.getRedisConfig() + ); + } + + return this.redisClientProvider; + } } export { PartitionModule }; diff --git a/src/partition/read_channel_list.lua b/src/partition/read_channel_list.lua new file mode 100644 index 00000000..c26cdf65 --- /dev/null +++ b/src/partition/read_channel_list.lua @@ -0,0 +1,30 @@ +local entries = redis.call('hgetall', 'publicChannelList') +if #entries == 0 then + return '[]' +end + +local channelList = {} +-- ARGV[1] holds the expiration timestamp. Anything older than this +-- will be discarded. +local expiration = tonumber(ARGV[1]) +for i = 1, #entries, 2 do + local uid = entries[i] + local entry = cjson.decode(entries[i+1]) + local timestamp = tonumber(entry['timestamp']) + if timestamp < expiration then + redis.call('hdel', 'publicChannelList', uid) + else + local channels = entry['channels'] + for j = 1, #channels do + channelList[#channelList+1] = channels[j] + end + end +end + +-- Necessary to check for this condition because +-- if the table is empty, cjson will encode it as an object ('{}') +if #channelList == 0 then + return '[]' +else + return cjson.encode(channelList) +end diff --git a/src/server.js b/src/server.js index 97e57d52..1567d9b3 100644 --- a/src/server.js +++ b/src/server.js @@ -43,6 +43,7 @@ var db = require("./database"); var Flags = require("./flags"); var sio = require("socket.io"); import LocalChannelIndex from './web/localchannelindex'; +import { PartitionChannelIndex } from './partition/partitionchannelindex'; import IOConfiguration from './configuration/ioconfig'; import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; @@ -86,7 +87,14 @@ var Server = function () { const ioConfig = IOConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config); const clusterClient = initModule.getClusterClient(); - const channelIndex = new LocalChannelIndex(); + var channelIndex; + if (Config.get("enable-partition")) { + channelIndex = new PartitionChannelIndex( + initModule.getRedisClientProvider().get() + ); + } else { + channelIndex = new LocalChannelIndex(); + } self.express = express(); require("./web/webserver").init(self.express, webConfig, From 77d84d5b76c9bba6629eae6f30f9da65098dba9f Mon Sep 17 00:00:00 2001 From: calzoneman Date: Mon, 13 Jun 2016 23:09:27 -0700 Subject: [PATCH 7/7] Add redis client error listener --- package.json | 2 +- src/partition/partitionchannelindex.js | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 44d49b98..4e28f4b1 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.16.1", + "version": "3.17.0", "repository": { "url": "http://github.com/calzoneman/sync" }, diff --git a/src/partition/partitionchannelindex.js b/src/partition/partitionchannelindex.js index 313e9f32..c23a650c 100644 --- a/src/partition/partitionchannelindex.js +++ b/src/partition/partitionchannelindex.js @@ -15,6 +15,10 @@ class PartitionChannelIndex { this.redisClient = redisClient; this.uid = uuid.v4(); this.cachedList = []; + this.redisClient.on('error', error => { + Logger.errlog.log(`Redis error: ${error}`); + }); + process.nextTick(() => { SERVER = require('../server').getServer(); this.refreshCache();