Fix channel save error introduced by removing async-to-generator

This commit is contained in:
Calvin Montgomery 2018-08-29 20:59:07 -07:00
parent d9e2a62f77
commit c7fcd11e53
4 changed files with 50 additions and 30 deletions

View File

@ -2,7 +2,7 @@
"author": "Calvin Montgomery", "author": "Calvin Montgomery",
"name": "CyTube", "name": "CyTube",
"description": "Online media synchronizer and chat", "description": "Online media synchronizer and chat",
"version": "3.58.1", "version": "3.58.2",
"repository": { "repository": {
"url": "http://github.com/calzoneman/sync" "url": "http://github.com/calzoneman/sync"
}, },

View File

@ -51,13 +51,19 @@ function initChannelDumper(Server) {
var wait = CHANNEL_SAVE_INTERVAL / Server.channels.length; var wait = CHANNEL_SAVE_INTERVAL / Server.channels.length;
LOGGER.info(`Saving channels with delay ${wait}`); LOGGER.info(`Saving channels with delay ${wait}`);
Promise.reduce(Server.channels, (_, chan) => { Promise.reduce(Server.channels, (_, chan) => {
return Promise.delay(wait).then(() => { return Promise.delay(wait).then(async () => {
if (!chan.dead && chan.users && chan.users.length > 0) { if (!chan.dead && chan.users && chan.users.length > 0) {
return chan.saveState().tap(() => { try {
await chan.saveState();
LOGGER.info(`Saved /${chanPath}/${chan.name}`); LOGGER.info(`Saved /${chanPath}/${chan.name}`);
}).catch(err => { } catch (error) {
LOGGER.error(`Failed to save /${chanPath}/${chan.name}: ${err.stack}`); LOGGER.error(
}); 'Failed to save /%s/%s: %s',
chanPath,
chan ? chan.name : '<undefined>',
error.stack
);
}
} }
}).catch(error => { }).catch(error => {
LOGGER.error(`Failed to save channel: ${error.stack}`); LOGGER.error(`Failed to save channel: ${error.stack}`);

View File

@ -9,8 +9,6 @@ import { ChannelStateSizeError } from '../errors';
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { throttle } from '../util/throttle'; import { throttle } from '../util/throttle';
import Logger from '../logger'; import Logger from '../logger';
// Not directly used, but needs to be in scope for async functions
import Promise from 'bluebird';
const LOGGER = require('@calzoneman/jsli')('channel'); const LOGGER = require('@calzoneman/jsli')('channel');

View File

@ -377,15 +377,17 @@ Server.prototype.setAnnouncement = function (data) {
}; };
Server.prototype.forceSave = function () { Server.prototype.forceSave = function () {
Promise.map(this.channels, channel => { Promise.map(this.channels, async channel => {
try { try {
return channel.saveState().tap(() => { await channel.saveState();
LOGGER.info(`Saved /${this.chanPath}/${channel.name}`); LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
}).catch(err => {
LOGGER.error(`Failed to save /${this.chanPath}/${channel.name}: ${err.stack}`);
});
} catch (error) { } catch (error) {
LOGGER.error(`Failed to save channel: ${error.stack}`); LOGGER.error(
'Failed to save /%s/%s: %s',
this.chanPath,
channel ? channel.name : '<undefined>',
error.stack
);
} }
}, { concurrency: 5 }).then(() => { }, { concurrency: 5 }).then(() => {
LOGGER.info('Finished save'); LOGGER.info('Finished save');
@ -394,15 +396,17 @@ Server.prototype.forceSave = function () {
Server.prototype.shutdown = function () { Server.prototype.shutdown = function () {
LOGGER.info("Unloading channels"); LOGGER.info("Unloading channels");
Promise.map(this.channels, channel => { Promise.map(this.channels, async channel => {
try { try {
return channel.saveState().tap(() => { await channel.saveState();
LOGGER.info(`Saved /${this.chanPath}/${channel.name}`); LOGGER.info(`Saved /${this.chanPath}/${channel.name}`);
}).catch(err => {
LOGGER.error(`Failed to save /${this.chanPath}/${channel.name}: ${err.stack}`);
});
} catch (error) { } catch (error) {
LOGGER.error(`Failed to save channel: ${error.stack}`); LOGGER.error(
'Failed to save /%s/%s: %s',
this.chanPath,
channel ? channel.name : '<undefined>',
error.stack
);
} }
}, { concurrency: 5 }).then(() => { }, { concurrency: 5 }).then(() => {
LOGGER.info("Goodbye"); LOGGER.info("Goodbye");
@ -415,16 +419,23 @@ Server.prototype.shutdown = function () {
Server.prototype.handlePartitionMapChange = function () { Server.prototype.handlePartitionMapChange = function () {
const channels = Array.prototype.slice.call(this.channels); const channels = Array.prototype.slice.call(this.channels);
Promise.map(channels, channel => { Promise.map(channels, async channel => {
if (channel.dead) { if (channel.dead) {
return; return;
} }
if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) { if (!this.partitionDecider.isChannelOnThisPartition(channel.uniqueName)) {
LOGGER.info("Partition changed for " + channel.uniqueName); LOGGER.info("Partition changed for " + channel.uniqueName);
return channel.saveState().then(() => { try {
channel.broadcastAll("partitionChange", await channel.saveState();
this.partitionDecider.getPartitionForChannel(channel.uniqueName));
channel.broadcastAll(
"partitionChange",
this.partitionDecider.getPartitionForChannel(
channel.uniqueName
)
);
const users = Array.prototype.slice.call(channel.users); const users = Array.prototype.slice.call(channel.users);
users.forEach(u => { users.forEach(u => {
try { try {
@ -433,11 +444,16 @@ Server.prototype.handlePartitionMapChange = function () {
// Ignore // Ignore
} }
}); });
this.unloadChannel(channel, { skipSave: true }); this.unloadChannel(channel, { skipSave: true });
}).catch(error => { } catch (error) {
LOGGER.error(`Failed to unload /${this.chanPath}/${channel.name} for ` + LOGGER.error(
`partition map flip: ${error.stack}`); 'Failed to unload /%s/%s for partition map flip: %s',
}); this.chanPath,
channel ? channel.name : '<undefined>',
error.stack
);
}
} }
}, { concurrency: 5 }).then(() => { }, { concurrency: 5 }).then(() => {
LOGGER.info("Partition reload complete"); LOGGER.info("Partition reload complete");