Merge pull request #671 from calzoneman/knex

The knexening: part 1
This commit is contained in:
Calvin Montgomery 2017-05-29 13:16:35 -07:00 committed by GitHub
commit 7fcf31dec6
3 changed files with 72 additions and 93 deletions

View File

@ -31,6 +31,7 @@
"graceful-fs": "^4.1.2",
"http-errors": "^1.3.1",
"json-typecheck": "^0.1.3",
"knex": "^0.13.0",
"lodash": "^4.13.1",
"morgan": "^1.6.1",
"mysql": "^2.9.0",

View File

@ -6,52 +6,68 @@ var net = require("net");
var util = require("./utilities");
import * as Metrics from 'cytube-common/lib/metrics/metrics';
import { LoggerFactory } from '@calzoneman/jsli';
import knex from 'knex';
const LOGGER = LoggerFactory.getLogger('database');
var pool = null;
var global_ipbans = {};
let db = null;
class Database {
constructor() {
const config = {
client: 'mysql',
connection: {
host: Config.get('mysql.server'),
port: Config.get('mysql.port'),
user: Config.get('mysql.user'),
password: Config.get('mysql.password'),
database: Config.get('mysql.database'),
multipleStatements: true, // Legacy thing
charset: 'UTF8MB4_GENERAL_CI'
},
pool: {
min: Config.get('mysql.pool-size'),
max: Config.get('mysql.pool-size')
},
debug: !!process.env.KNEX_DEBUG
};
this.knex = knex(config);
}
}
module.exports.init = function () {
pool = mysql.createPool({
host: Config.get("mysql.server"),
port: Config.get("mysql.port"),
user: Config.get("mysql.user"),
password: Config.get("mysql.password"),
database: Config.get("mysql.database"),
multipleStatements: true,
charset: "UTF8MB4_GENERAL_CI", // Needed for emoji and other non-BMP unicode
connectionLimit: Config.get("mysql.pool-size")
});
// Test the connection
pool.getConnection(function (err, conn) {
if (err) {
LOGGER.error("Initial database connection failed: " + err.stack);
process.exit(1);
} else {
tables.init(module.exports.query, function (err) {
if (err) {
return;
}
require("./database/update").checkVersion();
module.exports.loadAnnouncement();
db = new Database();
db.knex.raw('select 1 from dual')
.catch(error => {
LOGGER.error('Initial database connection failed: %s', error.stack);
process.exit(1);
}).then(() => {
process.nextTick(legacySetup);
});
// Refresh global IP bans
module.exports.listGlobalBans();
}
});
pool.on("enqueue", function () {
Metrics.incCounter("db:queryQueued", 1);
});
global_ipbans = {};
module.exports.users = require("./database/accounts");
module.exports.channels = require("./database/channels");
module.exports.pool = pool;
};
module.exports.getDB = function getDB() {
return db;
};
function legacySetup() {
tables.init(module.exports.query, function (err) {
if (err) {
return;
}
require("./database/update").checkVersion();
module.exports.loadAnnouncement();
});
// Refresh global IP bans
module.exports.listGlobalBans();
}
/**
* Execute a database query
*/
@ -60,50 +76,26 @@ module.exports.query = function (query, sub, callback) {
// 2nd argument is optional
if (typeof sub === "function") {
callback = sub;
sub = false;
sub = undefined;
}
if (typeof callback !== "function") {
callback = blackHole;
}
pool.getConnection(function (err, conn) {
if (err) {
LOGGER.error("! DB connection failed: " + err);
callback("Database failure", null);
} else {
function cback(err, res) {
conn.release();
if (err) {
LOGGER.error("! DB query failed: " + query);
if (sub) {
LOGGER.error("Substitutions: " + sub);
}
LOGGER.error(err);
callback("Database failure", null);
} else {
callback(null, res);
}
Metrics.stopTimer(timer);
}
if (process.env.SHOW_SQL) {
console.log(query);
}
if (process.env.SHOW_SQL) {
console.log(query);
}
try {
if (sub) {
conn.query(query, sub, cback);
} else {
conn.query(query, cback);
}
} catch (error) {
LOGGER.error("Broken query: " + error.stack);
callback("Broken query", null);
conn.release();
}
}
});
db.knex.raw(query, sub)
.then(res => {
process.nextTick(callback, null, res[0]);
}).catch(error => {
LOGGER.error('Legacy DB query failed. Query: %s, Substitutions: %j, Error: %s', query, sub, error);
process.nextTick(callback, 'Database failure', null);
}).finally(() => {
Metrics.stopTimer(timer);
});
};
/**

View File

@ -363,29 +363,15 @@ function populateUsernameDedupeColumn(cb) {
}
Promise.map(rows, row => {
return new Promise((resolve, reject) => {
db.pool.getConnection((error, conn) => {
if (error) {
reject(error);
return;
}
const dedupedName = dbUsers.dedupeUsername(row.name);
LOGGER.info(`Deduping [${row.name}] as [${dedupedName}]`);
conn.query("UPDATE users SET name_dedupe = ? WHERE id = ?", [dedupedName, row.id], (error, res) => {
conn.release();
if (error) {
if (error.errno === 1062) {
LOGGER.info(`WARNING: could not set name_dedupe for [${row.name}] due to an existing row for [${dedupedName}]`);
resolve();
} else {
reject(error);
}
} else {
resolve();
}
});
});
const dedupedName = dbUsers.dedupeUsername(row.name);
LOGGER.info(`Deduping [${row.name}] as [${dedupedName}]`);
return db.getDB().knex.raw("UPDATE users SET name_dedupe = ? WHERE id = ?", [dedupedName, row.id])
.catch(error => {
if (error.errno === 1062) {
LOGGER.info(`WARNING: could not set name_dedupe for [${row.name}] due to an existing row for [${dedupedName}]`);
} else {
throw error;
}
});
}, { concurrency: 10 }).then(() => {
cb();