mirror of https://github.com/calzoneman/sync.git
Add configuration for redis key
This commit is contained in:
parent
654d57b53e
commit
d159a16aca
|
@ -53,10 +53,11 @@ function loadPartitionMap(filename) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const client = partitionModule.getRedisClientProvider().get();
|
const client = partitionModule.getRedisClientProvider().get();
|
||||||
|
const config = partitionModule.partitionConfig;
|
||||||
client.once('ready', () => {
|
client.once('ready', () => {
|
||||||
client.multi()
|
client.multi()
|
||||||
.set('partitionMap', JSON.stringify(newMap))
|
.set(config.getPartitionMapKey(), JSON.stringify(newMap))
|
||||||
.publish('partitionMap', new Date().toISOString())
|
.publish(config.getPublishChannel(), new Date().toISOString())
|
||||||
.execAsync()
|
.execAsync()
|
||||||
.then(result => {
|
.then(result => {
|
||||||
console.log(`Result: ${result}`);
|
console.log(`Result: ${result}`);
|
||||||
|
|
|
@ -10,6 +10,14 @@ class PartitionConfig {
|
||||||
getRedisConfig() {
|
getRedisConfig() {
|
||||||
return this.config.redis;
|
return this.config.redis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getPublishChannel() {
|
||||||
|
return this.config.redis.publishChannel;
|
||||||
|
}
|
||||||
|
|
||||||
|
getPartitionMapKey() {
|
||||||
|
return this.config.redis.partitionMapKey;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export { PartitionConfig };
|
export { PartitionConfig };
|
||||||
|
|
|
@ -49,6 +49,22 @@ class PartitionMap {
|
||||||
}
|
}
|
||||||
|
|
||||||
static fromJSON(json) {
|
static fromJSON(json) {
|
||||||
|
if (json === null) {
|
||||||
|
throw new Error('Cannot construct PartitionMap: input is null');
|
||||||
|
} else if (typeof json !== 'object') {
|
||||||
|
throw new Error(`Cannot construct PartitionMap from input "${json}" of type `
|
||||||
|
+ typeof json);
|
||||||
|
} else if (!json.partitions || typeof json.partitions !== 'object') {
|
||||||
|
throw new Error('Cannot construct PartitionMap: field partitions must be '
|
||||||
|
+ `an object but was "${json.partitions}"`);
|
||||||
|
} else if (!json.overrides || typeof json.overrides !== 'object') {
|
||||||
|
throw new Error('Cannot construct PartitionMap: field overrides must be '
|
||||||
|
+ `an object but was "${json.overrides}"`);
|
||||||
|
} else if (!json.pool || !Array.isArray(json.pool)) {
|
||||||
|
throw new Error('Cannot construct PartitionMap: field pool must be '
|
||||||
|
+ `an array but was "${json.pool}"`);
|
||||||
|
}
|
||||||
|
|
||||||
return new PartitionMap(json.partitions, json.pool, json.overrides);
|
return new PartitionMap(json.partitions, json.pool, json.overrides);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ class PartitionModule {
|
||||||
if (!this.partitionMapReloader) {
|
if (!this.partitionMapReloader) {
|
||||||
const redisProvider = this.getRedisClientProvider();
|
const redisProvider = this.getRedisClientProvider();
|
||||||
this.partitionMapReloader = new RedisPartitionMapReloader(
|
this.partitionMapReloader = new RedisPartitionMapReloader(
|
||||||
|
this.partitionConfig,
|
||||||
redisProvider.get(), // Client for GET partitionMap
|
redisProvider.get(), // Client for GET partitionMap
|
||||||
redisProvider.get()); // Subscribe client
|
redisProvider.get()); // Subscribe client
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,8 +3,9 @@ import logger from 'cytube-common/lib/logger';
|
||||||
import { EventEmitter } from 'events';
|
import { EventEmitter } from 'events';
|
||||||
|
|
||||||
class RedisPartitionMapReloader extends EventEmitter {
|
class RedisPartitionMapReloader extends EventEmitter {
|
||||||
constructor(redisClient, subClient) {
|
constructor(config, redisClient, subClient) {
|
||||||
super();
|
super();
|
||||||
|
this.config = config;
|
||||||
this.redisClient = redisClient;
|
this.redisClient = redisClient;
|
||||||
this.subClient = subClient;
|
this.subClient = subClient;
|
||||||
this.partitionMap = PartitionMap.empty();
|
this.partitionMap = PartitionMap.empty();
|
||||||
|
@ -13,20 +14,21 @@ class RedisPartitionMapReloader extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
subscribe() {
|
subscribe() {
|
||||||
this.subClient.subscribe('partitionMap');
|
this.subClient.subscribe(this.config.getPublishChannel());
|
||||||
this.subClient.on('message', (channel, message) => {
|
this.subClient.on('message', (channel, message) => {
|
||||||
if (channel !== 'partitionMap') {
|
if (channel !== this.config.getPublishChannel()) {
|
||||||
logger.warn('RedisPartitionMapReloader received unexpected message '
|
logger.warn('RedisPartitionMapReloader received unexpected message '
|
||||||
+ `on redis channel ${channel}`);
|
+ `on redis channel ${channel}`);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.info(`Received partition map update message published at ${message}`);
|
||||||
this.reload();
|
this.reload();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
reload() {
|
reload() {
|
||||||
this.redisClient.getAsync('partitionMap').then(result => {
|
this.redisClient.getAsync(this.config.getPartitionMapKey()).then(result => {
|
||||||
var newMap = null;
|
var newMap = null;
|
||||||
try {
|
try {
|
||||||
newMap = PartitionMap.fromJSON(JSON.parse(result));
|
newMap = PartitionMap.fromJSON(JSON.parse(result));
|
||||||
|
|
Loading…
Reference in New Issue