This commit is contained in:
Calvin Montgomery 2018-08-18 12:27:24 -07:00
parent cb687fc078
commit a9a644460f
5 changed files with 122 additions and 61 deletions

View File

@ -2,7 +2,7 @@
"author": "Calvin Montgomery",
"name": "CyTube",
"description": "Online media synchronizer and chat",
"version": "3.56.6",
"version": "3.57.0",
"repository": {
"url": "http://github.com/calzoneman/sync"
},

View File

@ -1,45 +1,98 @@
import Promise from 'bluebird';
import uuid from 'uuid';
import { runLuaScript } from '../redis/lualoader';
import path from 'path';
const LOGGER = require('@calzoneman/jsli')('partitionchannelindex');
var SERVER = null;
const CHANNEL_INDEX = 'publicChannelList';
const CACHE_REFRESH_INTERVAL = 30 * 1000;
const CACHE_EXPIRE_DELAY = 40 * 1000;
const READ_CHANNEL_LIST = path.join(__dirname, 'read_channel_list.lua');
class PartitionChannelIndex {
constructor(redisClient) {
this.redisClient = redisClient;
this.uid = uuid.v4();
this.cachedList = [];
this.redisClient.on('error', error => {
LOGGER.error(`Redis error: ${error}`);
constructor(pubClient, subClient, channel) {
this.id = uuid.v4();
this.pubClient = pubClient;
this.subClient = subClient;
this.channel = channel;
this.id2instance = new Map();
this._cache = [];
this.pubClient.on('error', error => {
LOGGER.error('pubClient error: %s', error.stack);
});
this.subClient.on('error', error => {
LOGGER.error('subClient error: %s', error.stack);
});
process.nextTick(() => {
SERVER = require('../server').getServer();
this.refreshCache();
setInterval(this.refreshCache.bind(this), CACHE_REFRESH_INTERVAL);
this.subClient.once('ready', () => {
this.subClient.on(
'message',
(channel, message) => this._handleMessage(channel, message)
);
this.subClient.subscribe(this.channel);
this._bootstrap();
});
}
refreshCache() {
this.publishLocalChannels();
runLuaScript(this.redisClient, READ_CHANNEL_LIST, [
0,
Date.now() - CACHE_EXPIRE_DELAY
]).then(result => {
this.cachedList = JSON.parse(result);
}).catch(error => {
LOGGER.error(`Failed to refresh channel list: ${error.stack}`);
_bootstrap() {
LOGGER.info('Bootstrapping partition channel index (id=%s)', this.id);
SERVER = require('../server').getServer();
setInterval(() => this._broadcastMyList(), CACHE_REFRESH_INTERVAL);
const bootstrap = JSON.stringify({
operation: 'bootstrap',
instanceId: this.id,
payload: {}
});
this.pubClient.publishAsync(this.channel, bootstrap).catch(error => {
LOGGER.error('Failed to send bootstrap request: %s', error.stack);
});
this._broadcastMyList();
}
publishLocalChannels() {
_handleMessage(channel, message) {
if (channel !== this.channel) {
LOGGER.warn('Unexpected message from channel "%s"', channel);
return;
}
try {
const { operation, instanceId, payload } = JSON.parse(message);
if (instanceId === this.id) {
return;
}
switch (operation) {
case 'bootstrap':
LOGGER.info(
'Received bootstrap request from %s',
instanceId
);
this._broadcastMyList();
break;
case 'put-list':
LOGGER.info(
'Received put-list request from %s',
instanceId
);
this._putList(instanceId, payload);
break;
default:
LOGGER.warn(
'Unknown channel index sync operation "%s" from %s',
operation,
instanceId
);
break;
}
} catch (error) {
LOGGER.error('Error handling channel index sync message: %s', error.stack);
}
}
_broadcastMyList() {
const channels = SERVER.packChannelList(true).map(channel => {
return {
name: channel.name,
@ -49,18 +102,50 @@ class PartitionChannelIndex {
};
});
const entry = JSON.stringify({
timestamp: Date.now(),
channels
this._putList(this.id, { channels });
const message = JSON.stringify({
operation: 'put-list',
instanceId: this.id,
payload: {
channels
}
});
this.redisClient.hsetAsync(CHANNEL_INDEX, this.uid, entry).catch(error => {
LOGGER.error(`Failed to publish local channel list: ${error.stack}`);
this.pubClient.publishAsync(this.channel, message).catch(error => {
LOGGER.error('Failed to publish local channel list: %s', error.stack);
});
}
_putList(instanceId, payload) {
const { channels } = payload;
this.id2instance.set(
instanceId,
{
lastUpdated: new Date(),
channels
}
);
this._updateCache();
}
_updateCache() {
let cache = [];
for (let [id, instance] of this.id2instance) {
if (Date.now() - instance.lastUpdated.getTime() > CACHE_EXPIRE_DELAY) {
LOGGER.warn('Removing expired channel list instance: %s', id);
this.id2instance.delete(id);
} else {
cache = cache.concat(instance.channels);
}
}
this._cache = cache;
}
listPublicChannels() {
return Promise.resolve(this.cachedList);
return Promise.resolve(this._cache);
}
}

View File

@ -26,6 +26,10 @@ class PartitionConfig {
getGlobalMessageBusChannel() {
return this.config.redis.globalMessageBusChannel || 'globalMessages';
}
getChannelIndexChannel() {
return this.config.redis.channelIndexChannel || 'channelIndexUpdates';
}
}
export { PartitionConfig };

View File

@ -1,30 +0,0 @@
local entries = redis.call('hgetall', 'publicChannelList')
if #entries == 0 then
return '[]'
end
local channelList = {}
-- ARGV[1] holds the expiration timestamp. Anything older than this
-- will be discarded.
local expiration = tonumber(ARGV[1])
for i = 1, #entries, 2 do
local uid = entries[i]
local entry = cjson.decode(entries[i+1])
local timestamp = tonumber(entry['timestamp'])
if timestamp < expiration then
redis.call('hdel', 'publicChannelList', uid)
else
local channels = entry['channels']
for j = 1, #channels do
channelList[#channelList+1] = channels[j]
end
end
end
-- Necessary to check for this condition because
-- if the table is empty, cjson will encode it as an object ('{}')
if #channelList == 0 then
return '[]'
else
return cjson.encode(channelList)
end

View File

@ -122,7 +122,9 @@ var Server = function () {
var channelIndex;
if (Config.get("enable-partition")) {
channelIndex = new PartitionChannelIndex(
initModule.getRedisClientProvider().get()
initModule.getRedisClientProvider().get(),
initModule.getRedisClientProvider().get(),
initModule.partitionConfig.getChannelIndexChannel()
);
} else {
channelIndex = new LocalChannelIndex();