diff --git a/package.json b/package.json index b0405ab4..44d49b98 100644 --- a/package.json +++ b/package.json @@ -44,9 +44,9 @@ }, "scripts": { "build-player": "$npm_node_execpath build-player.js", - "build-server": "babel --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/", + "build-server": "babel -D --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/", "postinstall": "./postinstall.sh", - "server-dev": "babel --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/" + "server-dev": "babel -D --watch --source-maps --loose es6.destructuring,es6.forOf --out-dir lib/ src/" }, "devDependencies": { "coffee-script": "^1.9.2" diff --git a/src/partition/partitionchannelindex.js b/src/partition/partitionchannelindex.js new file mode 100644 index 00000000..313e9f32 --- /dev/null +++ b/src/partition/partitionchannelindex.js @@ -0,0 +1,62 @@ +import Promise from 'bluebird'; +import uuid from 'uuid'; +import { runLuaScript } from 'cytube-common/lib/redis/lualoader'; +import path from 'path'; +import Logger from '../logger'; + +var SERVER = null; +const CHANNEL_INDEX = 'publicChannelList'; +const CACHE_REFRESH_INTERVAL = 30 * 1000; +const CACHE_EXPIRE_DELAY = 40 * 1000; +const READ_CHANNEL_LIST = path.join(__dirname, 'read_channel_list.lua') + +class PartitionChannelIndex { + constructor(redisClient) { + this.redisClient = redisClient; + this.uid = uuid.v4(); + this.cachedList = []; + process.nextTick(() => { + SERVER = require('../server').getServer(); + this.refreshCache(); + setInterval(this.refreshCache.bind(this), CACHE_REFRESH_INTERVAL); + }); + } + + refreshCache() { + this.publishLocalChannels(); + runLuaScript(this.redisClient, READ_CHANNEL_LIST, [ + 0, + Date.now() - CACHE_EXPIRE_DELAY + ]).then(result => { + this.cachedList = JSON.parse(result); + }).catch(error => { + Logger.errlog.log(`Failed to refresh channel list: ${error.stack}`); + }); + } + + publishLocalChannels() { + const channels = SERVER.packChannelList(true).map(channel => { + return { + name: channel.name, + mediatitle: channel.mediatitle, + pagetitle: channel.pagetitle, + usercount: channel.usercount + }; + }); + + const entry = JSON.stringify({ + timestamp: Date.now(), + channels + }); + + this.redisClient.hsetAsync(CHANNEL_INDEX, this.uid, entry).catch(error => { + Logger.errlog.log(`Failed to publish local channel list: ${error.stack}`); + }); + } + + listPublicChannels() { + return Promise.resolve(this.cachedList); + } +} + +export { PartitionChannelIndex }; diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index 7684b95e..b9ec4cdd 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -18,6 +18,10 @@ class PartitionConfig { getIdentity() { return this.config.identity; } + + getRedisConfig() { + return this.config.redis; + } } export { PartitionConfig }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 4c1adb2f..5a5188eb 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -2,6 +2,7 @@ import { loadFromToml } from 'cytube-common/lib/configuration/configloader'; import { PartitionConfig } from './partitionconfig'; import { PartitionDecider } from './partitiondecider'; import { PartitionClusterClient } from '../io/cluster/partitionclusterclient'; +import RedisClientProvider from 'cytube-common/lib/redis/redisclientprovider'; import logger from 'cytube-common/lib/logger'; import LegacyConfig from '../config'; import path from 'path'; @@ -58,6 +59,16 @@ class PartitionModule { return this.partitionClusterClient; } + + getRedisClientProvider() { + if (!this.redisClientProvider) { + this.redisClientProvider = new RedisClientProvider( + this.partitionConfig.getRedisConfig() + ); + } + + return this.redisClientProvider; + } } export { PartitionModule }; diff --git a/src/partition/read_channel_list.lua b/src/partition/read_channel_list.lua new file mode 100644 index 00000000..c26cdf65 --- /dev/null +++ b/src/partition/read_channel_list.lua @@ -0,0 +1,30 @@ +local entries = redis.call('hgetall', 'publicChannelList') +if #entries == 0 then + return '[]' +end + +local channelList = {} +-- ARGV[1] holds the expiration timestamp. Anything older than this +-- will be discarded. +local expiration = tonumber(ARGV[1]) +for i = 1, #entries, 2 do + local uid = entries[i] + local entry = cjson.decode(entries[i+1]) + local timestamp = tonumber(entry['timestamp']) + if timestamp < expiration then + redis.call('hdel', 'publicChannelList', uid) + else + local channels = entry['channels'] + for j = 1, #channels do + channelList[#channelList+1] = channels[j] + end + end +end + +-- Necessary to check for this condition because +-- if the table is empty, cjson will encode it as an object ('{}') +if #channelList == 0 then + return '[]' +else + return cjson.encode(channelList) +end diff --git a/src/server.js b/src/server.js index 97e57d52..1567d9b3 100644 --- a/src/server.js +++ b/src/server.js @@ -43,6 +43,7 @@ var db = require("./database"); var Flags = require("./flags"); var sio = require("socket.io"); import LocalChannelIndex from './web/localchannelindex'; +import { PartitionChannelIndex } from './partition/partitionchannelindex'; import IOConfiguration from './configuration/ioconfig'; import WebConfiguration from './configuration/webconfig'; import NullClusterClient from './io/cluster/nullclusterclient'; @@ -86,7 +87,14 @@ var Server = function () { const ioConfig = IOConfiguration.fromOldConfig(Config); const webConfig = WebConfiguration.fromOldConfig(Config); const clusterClient = initModule.getClusterClient(); - const channelIndex = new LocalChannelIndex(); + var channelIndex; + if (Config.get("enable-partition")) { + channelIndex = new PartitionChannelIndex( + initModule.getRedisClientProvider().get() + ); + } else { + channelIndex = new LocalChannelIndex(); + } self.express = express(); require("./web/webserver").init(self.express, webConfig,