From a9a644460fd0dddf9cfb7a5773eead2c21a249da Mon Sep 17 00:00:00 2001 From: Calvin Montgomery Date: Sat, 18 Aug 2018 12:27:24 -0700 Subject: [PATCH] Fix #760 --- package.json | 2 +- src/partition/partitionchannelindex.js | 143 ++++++++++++++++++++----- src/partition/partitionconfig.js | 4 + src/partition/read_channel_list.lua | 30 ------ src/server.js | 4 +- 5 files changed, 122 insertions(+), 61 deletions(-) delete mode 100644 src/partition/read_channel_list.lua diff --git a/package.json b/package.json index 173f556d..6c5ed614 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.56.6", + "version": "3.57.0", "repository": { "url": "http://github.com/calzoneman/sync" }, diff --git a/src/partition/partitionchannelindex.js b/src/partition/partitionchannelindex.js index deafe4a3..dc9b5865 100644 --- a/src/partition/partitionchannelindex.js +++ b/src/partition/partitionchannelindex.js @@ -1,45 +1,98 @@ import Promise from 'bluebird'; import uuid from 'uuid'; -import { runLuaScript } from '../redis/lualoader'; import path from 'path'; const LOGGER = require('@calzoneman/jsli')('partitionchannelindex'); var SERVER = null; -const CHANNEL_INDEX = 'publicChannelList'; const CACHE_REFRESH_INTERVAL = 30 * 1000; const CACHE_EXPIRE_DELAY = 40 * 1000; -const READ_CHANNEL_LIST = path.join(__dirname, 'read_channel_list.lua'); class PartitionChannelIndex { - constructor(redisClient) { - this.redisClient = redisClient; - this.uid = uuid.v4(); - this.cachedList = []; - this.redisClient.on('error', error => { - LOGGER.error(`Redis error: ${error}`); + constructor(pubClient, subClient, channel) { + this.id = uuid.v4(); + this.pubClient = pubClient; + this.subClient = subClient; + this.channel = channel; + this.id2instance = new Map(); + this._cache = []; + + this.pubClient.on('error', error => { + LOGGER.error('pubClient error: %s', error.stack); + }); + this.subClient.on('error', error => { + LOGGER.error('subClient error: %s', error.stack); }); - process.nextTick(() => { - SERVER = require('../server').getServer(); - this.refreshCache(); - setInterval(this.refreshCache.bind(this), CACHE_REFRESH_INTERVAL); + this.subClient.once('ready', () => { + this.subClient.on( + 'message', + (channel, message) => this._handleMessage(channel, message) + ); + this.subClient.subscribe(this.channel); + this._bootstrap(); }); } - refreshCache() { - this.publishLocalChannels(); - runLuaScript(this.redisClient, READ_CHANNEL_LIST, [ - 0, - Date.now() - CACHE_EXPIRE_DELAY - ]).then(result => { - this.cachedList = JSON.parse(result); - }).catch(error => { - LOGGER.error(`Failed to refresh channel list: ${error.stack}`); + _bootstrap() { + LOGGER.info('Bootstrapping partition channel index (id=%s)', this.id); + SERVER = require('../server').getServer(); + setInterval(() => this._broadcastMyList(), CACHE_REFRESH_INTERVAL); + + const bootstrap = JSON.stringify({ + operation: 'bootstrap', + instanceId: this.id, + payload: {} }); + + this.pubClient.publishAsync(this.channel, bootstrap).catch(error => { + LOGGER.error('Failed to send bootstrap request: %s', error.stack); + }); + + this._broadcastMyList(); } - publishLocalChannels() { + _handleMessage(channel, message) { + if (channel !== this.channel) { + LOGGER.warn('Unexpected message from channel "%s"', channel); + return; + } + + try { + const { operation, instanceId, payload } = JSON.parse(message); + if (instanceId === this.id) { + return; + } + + switch (operation) { + case 'bootstrap': + LOGGER.info( + 'Received bootstrap request from %s', + instanceId + ); + this._broadcastMyList(); + break; + case 'put-list': + LOGGER.info( + 'Received put-list request from %s', + instanceId + ); + this._putList(instanceId, payload); + break; + default: + LOGGER.warn( + 'Unknown channel index sync operation "%s" from %s', + operation, + instanceId + ); + break; + } + } catch (error) { + LOGGER.error('Error handling channel index sync message: %s', error.stack); + } + } + + _broadcastMyList() { const channels = SERVER.packChannelList(true).map(channel => { return { name: channel.name, @@ -49,18 +102,50 @@ class PartitionChannelIndex { }; }); - const entry = JSON.stringify({ - timestamp: Date.now(), - channels + this._putList(this.id, { channels }); + + const message = JSON.stringify({ + operation: 'put-list', + instanceId: this.id, + payload: { + channels + } }); - this.redisClient.hsetAsync(CHANNEL_INDEX, this.uid, entry).catch(error => { - LOGGER.error(`Failed to publish local channel list: ${error.stack}`); + this.pubClient.publishAsync(this.channel, message).catch(error => { + LOGGER.error('Failed to publish local channel list: %s', error.stack); }); } + _putList(instanceId, payload) { + const { channels } = payload; + this.id2instance.set( + instanceId, + { + lastUpdated: new Date(), + channels + } + ); + + this._updateCache(); + } + + _updateCache() { + let cache = []; + for (let [id, instance] of this.id2instance) { + if (Date.now() - instance.lastUpdated.getTime() > CACHE_EXPIRE_DELAY) { + LOGGER.warn('Removing expired channel list instance: %s', id); + this.id2instance.delete(id); + } else { + cache = cache.concat(instance.channels); + } + } + + this._cache = cache; + } + listPublicChannels() { - return Promise.resolve(this.cachedList); + return Promise.resolve(this._cache); } } diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index ea5a6c93..4db0673a 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -26,6 +26,10 @@ class PartitionConfig { getGlobalMessageBusChannel() { return this.config.redis.globalMessageBusChannel || 'globalMessages'; } + + getChannelIndexChannel() { + return this.config.redis.channelIndexChannel || 'channelIndexUpdates'; + } } export { PartitionConfig }; diff --git a/src/partition/read_channel_list.lua b/src/partition/read_channel_list.lua deleted file mode 100644 index c26cdf65..00000000 --- a/src/partition/read_channel_list.lua +++ /dev/null @@ -1,30 +0,0 @@ -local entries = redis.call('hgetall', 'publicChannelList') -if #entries == 0 then - return '[]' -end - -local channelList = {} --- ARGV[1] holds the expiration timestamp. Anything older than this --- will be discarded. -local expiration = tonumber(ARGV[1]) -for i = 1, #entries, 2 do - local uid = entries[i] - local entry = cjson.decode(entries[i+1]) - local timestamp = tonumber(entry['timestamp']) - if timestamp < expiration then - redis.call('hdel', 'publicChannelList', uid) - else - local channels = entry['channels'] - for j = 1, #channels do - channelList[#channelList+1] = channels[j] - end - end -end - --- Necessary to check for this condition because --- if the table is empty, cjson will encode it as an object ('{}') -if #channelList == 0 then - return '[]' -else - return cjson.encode(channelList) -end diff --git a/src/server.js b/src/server.js index 1ed26111..74aa0abe 100644 --- a/src/server.js +++ b/src/server.js @@ -122,7 +122,9 @@ var Server = function () { var channelIndex; if (Config.get("enable-partition")) { channelIndex = new PartitionChannelIndex( - initModule.getRedisClientProvider().get() + initModule.getRedisClientProvider().get(), + initModule.getRedisClientProvider().get(), + initModule.partitionConfig.getChannelIndexChannel() ); } else { channelIndex = new LocalChannelIndex();