diff --git a/package.json b/package.json index 4cf70334..438478b0 100644 --- a/package.json +++ b/package.json @@ -2,7 +2,7 @@ "author": "Calvin Montgomery", "name": "CyTube", "description": "Online media synchronizer and chat", - "version": "3.46.2", + "version": "3.47.0", "repository": { "url": "http://github.com/calzoneman/sync" }, diff --git a/src/legacymodule.js b/src/legacymodule.js index 0e91cd9d..01a84cf6 100644 --- a/src/legacymodule.js +++ b/src/legacymodule.js @@ -1,6 +1,7 @@ import NullClusterClient from './io/cluster/nullclusterclient'; import Config from './config'; import IOConfiguration from './configuration/ioconfig'; +import { EventEmitter } from 'events'; class LegacyModule { getIOConfig() { @@ -15,6 +16,10 @@ class LegacyModule { return new NullClusterClient(this.getIOConfig()); } + getGlobalMessageBus() { + return new EventEmitter(); + } + onReady() { } diff --git a/src/partition/partitionconfig.js b/src/partition/partitionconfig.js index 69d3ad94..ea5a6c93 100644 --- a/src/partition/partitionconfig.js +++ b/src/partition/partitionconfig.js @@ -22,6 +22,10 @@ class PartitionConfig { getAnnouncementChannel() { return this.config.redis.announcementChannel || 'serverAnnouncements'; } + + getGlobalMessageBusChannel() { + return this.config.redis.globalMessageBusChannel || 'globalMessages'; + } } export { PartitionConfig }; diff --git a/src/partition/partitionmodule.js b/src/partition/partitionmodule.js index 10de9ef8..748393eb 100644 --- a/src/partition/partitionmodule.js +++ b/src/partition/partitionmodule.js @@ -7,6 +7,7 @@ import LegacyConfig from '../config'; import path from 'path'; import { AnnouncementRefresher } from './announcementrefresher'; import { RedisPartitionMapReloader } from './redispartitionmapreloader'; +import { RedisMessageBus } from '../pubsub/redis'; const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf', 'partitions.toml'); @@ -104,6 +105,19 @@ class PartitionModule { return this.announcementRefresher; } + + getGlobalMessageBus() { + if (!this.globalMessageBus) { + const provider = this.getRedisClientProvider(); + this.globalMessageBus = new RedisMessageBus( + provider.get(), + provider.get(), + this.partitionConfig.getGlobalMessageBusChannel() + ); + } + + return this.globalMessageBus; + } } export { PartitionModule }; diff --git a/src/pubsub/redis.js b/src/pubsub/redis.js new file mode 100644 index 00000000..8f63d63a --- /dev/null +++ b/src/pubsub/redis.js @@ -0,0 +1,70 @@ +import { EventEmitter } from 'events'; +import { v4 as uuidv4 } from 'uuid'; + +const LOGGER = require('@calzoneman/jsli')('redis-messagebus'); + +class RedisMessageBus extends EventEmitter { + constructor(pubClient, subClient, channel) { + super(); + + this.pubClient = pubClient; + this.subClient = subClient; + this.channel = channel; + this.publisherID = uuidv4(); + + subClient.once('ready', this.subscribe.bind(this)); + } + + subscribe() { + this.subClient.subscribe(this.channel); + this.subClient.on('message', this.onMessage.bind(this)); + + LOGGER.info('Subscribed to Redis messages on channel %s', this.channel); + } + + onMessage(channel, message) { + if (channel !== this.channel) { + LOGGER.warn('Ignoring message from mismatched channel "%s"', channel); + return; + } + + try { + const { event, payload } = JSON.parse(message); + + this._emit(event, payload); + } catch (error) { + if (error instanceof SyntaxError) { + LOGGER.error( + 'Malformed message received: %s (message: "%s")', + message, + error + ); + } else { + LOGGER.error('Unexpected error decoding message: %s', error.stack); + } + + return; + } + } + + async emit(event, payload) { + try { + const message = JSON.stringify({ + time: new Date(), + publisher: this.publisherID, + event, + payload + }); + + await this.pubClient.publish(this.channel, message); + } catch (error) { + LOGGER.error('Unable to send event %s: %s', event, error); + } + } +} + +Object.assign(RedisMessageBus.prototype, { + _emit: EventEmitter.prototype.emit +}); + +export { RedisMessageBus }; diff --git a/src/server.js b/src/server.js index 76b596be..216f41af 100644 --- a/src/server.js +++ b/src/server.js @@ -72,6 +72,9 @@ var Server = function () { initModule = this.initModule = new LegacyModule(); } + const globalMessageBus = this.initModule.getGlobalMessageBus(); + globalMessageBus.on('UserProfileChanged', this.handleUserProfileChange.bind(this)); + // database init ------------------------------------------------------ var Database = require("./database"); self.db = Database; @@ -96,7 +99,8 @@ var Server = function () { ioConfig, clusterClient, channelIndex, - session); + session, + globalMessageBus); // http/https/sio server init ----------------------------------------- var key = "", cert = "", ca = undefined; @@ -391,3 +395,36 @@ Server.prototype.reloadPartitionMap = function () { this.initModule.getPartitionMapReloader().reload(); }; + +Server.prototype.handleUserProfileChange = function (event) { + try { + const lname = event.user.toLowerCase(); + + // Probably not the most efficient thing in the world, but w/e + // profile changes are not high volume + this.channels.forEach(channel => { + if (channel.dead) return; + + channel.users.forEach(user => { + if (user.getLowerName() === lname && user.account.user) { + user.account.user.profile = { + image: event.profile.image, + text: event.profile.text + }; + + user.account.update(); + + channel.sendUserProfile(channel.users, user); + + LOGGER.info( + 'Updated profile for user %s in channel %s', + lname, + channel.name + ); + } + }); + }); + } catch (error) { + LOGGER.error('handleUserProfileChange failed: %s', error); + } +}; diff --git a/src/web/account.js b/src/web/account.js index 09fe3458..7ef2bc29 100644 --- a/src/web/account.js +++ b/src/web/account.js @@ -17,6 +17,8 @@ const url = require("url"); const LOGGER = require('@calzoneman/jsli')('database/accounts'); +let globalMessageBus; + /** * Handles a GET request for /account/edit */ @@ -455,6 +457,14 @@ async function handleAccountProfile(req, res) { return; } + globalMessageBus.emit('UserProfileChanged', { + user: user.name, + profile: { + image, + text + } + }); + sendPug(res, "account-profile", { profileImage: image, profileText: text, @@ -661,7 +671,9 @@ module.exports = { /** * Initialize the module */ - init: function (app) { + init: function (app, _globalMessageBus) { + globalMessageBus = _globalMessageBus; + app.get("/account/edit", handleAccountEditPage); app.post("/account/edit", handleAccountEdit); app.get("/account/channels", handleAccountChannelPage); diff --git a/src/web/webserver.js b/src/web/webserver.js index f1540c55..f174ed15 100644 --- a/src/web/webserver.js +++ b/src/web/webserver.js @@ -162,7 +162,15 @@ module.exports = { /** * Initializes webserver callbacks */ - init: function (app, webConfig, ioConfig, clusterClient, channelIndex, session) { + init: function ( + app, + webConfig, + ioConfig, + clusterClient, + channelIndex, + session, + globalMessageBus + ) { const chanPath = Config.get('channel-path'); initPrometheus(app); @@ -217,7 +225,7 @@ module.exports = { app.get('/useragreement', handleUserAgreement); require('./routes/contact')(app, webConfig); require('./auth').init(app); - require('./account').init(app); + require('./account').init(app, globalMessageBus); require('./acp').init(app); require('../google2vtt').attach(app); require('./routes/google_drive_userscript')(app); diff --git a/test/pubsub/redis.js b/test/pubsub/redis.js new file mode 100644 index 00000000..b1477ab4 --- /dev/null +++ b/test/pubsub/redis.js @@ -0,0 +1,52 @@ +const assert = require('assert'); +const { RedisMessageBus } = require('../../lib/pubsub/redis'); +const { EventEmitter } = require('events'); +const sinon = require('sinon'); + +describe('RedisMessageBus', () => { + let pubClient, subClient, messageBus, publishSpy, subscribeSpy; + + beforeEach(() => { + pubClient = { publish: () => {} }; + subClient = new EventEmitter(); + + subClient.subscribe = () => {}; + subscribeSpy = sinon.spy(subClient, 'subscribe'); + + publishSpy = sinon.spy(pubClient, 'publish'); + + messageBus = new RedisMessageBus(pubClient, subClient, 'test'); + + subClient.emit('ready'); + }); + + describe('#onMessage', () => { + it('processes a valid message', done => { + messageBus.once('testEvent', payload => { + assert(subscribeSpy.withArgs('test').calledOnce); + assert.deepStrictEqual(payload, { foo: 'bar' }); + + done(); + }); + + messageBus.onMessage('test', '{"event":"testEvent","payload":{"foo":"bar"}}'); + }); + + it('processes a syntactically invalid message', done => { + messageBus.onMessage('test', 'not valid json lol'); + + done(); + }); + }); + + describe('#emit', () => { + it('emits messages', () => { + messageBus.emit('testEvent', { foo: 'bar' }); + + assert(publishSpy.withArgs('test', sinon.match(arg => { + arg = JSON.parse(arg); + return arg.event === 'testEvent' && arg.payload.foo === 'bar'; + })).calledOnce); + }); + }); +});