Add partitioning logic

This commit is contained in:
calzoneman 2016-06-06 21:54:49 -07:00
parent 5f773d46c9
commit 77465e6b49
6 changed files with 168 additions and 0 deletions

View File

@ -0,0 +1,15 @@
import Promise from 'bluebird';
class PartitionClusterClient {
constructor(partitionDecider) {
this.partitionDecider = partitionDecider;
}
getSocketConfig(channel) {
return Promise.resolve({
servers: this.partitionDecider.getPartitionForChannel(channel)
});
}
}
export { PartitionClusterClient };

View File

@ -0,0 +1,23 @@
class PartitionConfig {
constructor(config) {
this.config = config;
}
getPartitionMap() {
return this.config.partitions;
}
getOverrideMap() {
return this.config.overrides;
}
getPool() {
return this.config.pool;
}
getIdentity() {
return this.config.identity;
}
}
export { PartitionConfig };

View File

@ -0,0 +1,29 @@
import { murmurHash1 } from '../util/murmur';
class PartitionDecider {
constructor(config) {
this.identity = config.getIdentity();
this.partitionMap = config.getPartitionMap();
this.pool = config.getPool();
this.overrideMap = config.getOverrideMap();
}
getPartitionForChannel(channel) {
return this.partitionMap[this.getPartitionIdentityForChannel(channel)];
}
getPartitionIdentityForChannel(channel) {
if (this.overrideMap.hasOwnProperty(channel)) {
return this.overrideMap[channel];
} else {
const i = murmurHash1(channel) % this.pool.length;
return this.pool[i];
}
}
isChannelOnThisPartition(channel) {
return this.getPartitionIdentityForChannel(channel) === this.identity;
}
}
export { PartitionDecider };

View File

@ -0,0 +1,63 @@
import { loadFromToml } from 'cytube-common/lib/configuration/configloader';
import { PartitionConfig } from './partitionconfig';
import { PartitionDecider } from './partitiondecider';
import { PartitionClusterClient } from '../io/cluster/partitionclusterclient';
import logger from 'cytube-common/lib/logger';
import LegacyConfig from '../config';
import path from 'path';
const PARTITION_CONFIG_PATH = path.resolve(__dirname, '..', '..', 'conf',
'partitions.toml');
class PartitionModule {
constructor() {
this.initConfig();
}
onReady() {
}
initConfig() {
logger.initialize(null, null, LegacyConfig.get('debug'));
try {
this.partitionConfig = this.loadPartitionMap();
} catch (error) {
process.exit(1);
}
}
loadPartitionMap() {
try {
return loadFromToml(PartitionConfig, PARTITION_CONFIG_PATH);
} catch (error) {
if (typeof error.line !== 'undefined') {
logger.error(`Error in ${PARTITION_CONFIG_PATH}: ${error} ` +
`(line ${error.line})`);
} else {
logger.error(`Error loading ${PARTITION_CONFIG_PATH}: ` +
`${error.stack}`);
}
throw error;
}
}
getPartitionDecider() {
if (!this.partitionDecider) {
this.partitionDecider = new PartitionDecider(this.partitionConfig);
}
return this.partitionDecider;
}
getClusterClient() {
if (!this.partitionClusterClient) {
this.partitionClusterClient = new PartitionClusterClient(
this.getPartitionDecider());
}
return this.partitionClusterClient;
}
}
export { PartitionModule };

View File

@ -48,6 +48,7 @@ import WebConfiguration from './configuration/webconfig';
import NullClusterClient from './io/cluster/nullclusterclient';
import session from './session';
import { LegacyModule } from './legacymodule';
import { PartitionModule } from './partition/partitionmodule';
import * as Switches from './switches';
var Server = function () {
@ -68,6 +69,8 @@ var Server = function () {
}
const BackendModule = require('./backend/backendmodule').BackendModule;
initModule = new BackendModule();
} else if (Config.get('enable-partition')) {
initModule = new PartitionModule();
} else {
initModule = new LegacyModule();
}

35
src/util/murmur.js Normal file
View File

@ -0,0 +1,35 @@
const SEED = 0x1234;
const M = 0xc6a4a793;
const R = 16;
export function murmurHash1(str) {
const buffer = new Buffer(str, 'utf8');
var length = buffer.length;
var h = SEED ^ (length * M);
while (length >= 4) {
var k = buffer.readUInt32LE(buffer.length - length);
h += k;
h *= M;
h ^= h >> 16;
length -= 4;
}
switch (length) {
case 3:
h += buffer[buffer.length - 3] >> 16;
case 2:
h += buffer[buffer.length - 2] >> 8;
case 1:
h += buffer[buffer.length - 1];
h *= M;
h ^= h >> R;
}
h *= M;
h ^= h >> 10;
h *= M;
h ^= h >> 17;
return h;
}