/**
* @module Q
*/
var Q = require('Q');
var fs = require('fs');
var path = require('path');
var util = require('util');
var Db = Q.require('Db');
var Db_Mysql = Q.require('Db/Mysql');
/**
* Different utilities
* @class Utils
* @namespace Q
* @static
*/
var Utils = {};
/**
* Generate signature for an object
* @method signature
* @param {Object|String} data The data to sign
* @param {String} secret A secret to use for signature
* @return {string}
* @throws {Q.Exception} if secret is not defined
*/
Utils.signature = function (data, secret) {
if (!secret) {
throw new Q.Exception('Q.Utils.signature is expecting a secret');
}
if (typeof(data) !== 'string') {
data = http_build_query(ksort(data)).replace(/\+/g, '%20');
}
return Q.Crypto.HmacSHA1(data, secret).toString();
};
/**
* Sign data by adding signature field
* @method sign
* @param {object} data The data to sign
* @param {array} fieldKeys Optionally specify the array key path for the signature field
* @return {object}
*/
Utils.sign = function (data, fieldKeys) {
var secret = Q.Config.get(['Q', 'internal', 'secret'], null);
if (!secret) {
if (!fieldKeys || !fieldKeys.length) {
var sf = Q.Config.get(['Q', 'internal', 'sigField'], 'sig');
fieldKeys = ['Q.'+sf];
}
var ref = data;
for (var i=0, l=fieldKeys.length; i<l-1; ++i) {
if (!(fieldKeys[i] in ref)) {
ref[ fieldKeys[i] ] = {};
}
ref = ref[ fieldKeys[i] ];
}
ref [ fieldKeys[fieldKeys.length-1] ] = Utils.signature(data, secret);
}
return data;
};
/**
* express server middleware validate signature of internal request
* @method validate
*/
Utils.validate = function (req, res, next) {
// merge in GET data
if (req.body) Q.extend(req.body, req.query);
else req.body = req.query;
// validate signature
var secret = Q.Config.get(['Q', 'internal', 'secret'], null);
if (secret === null) {
return next();
}
var sgf = "Q."+Q.Config.get(['Q', 'internal', 'sigField'], 'sig'),
data = req.body, signature;
if (data[sgf]) {
signature = data[sgf];
delete data[sgf];
} else {
signature = null;
}
if (signature === Q.Utils.signature(data, secret)) {
next();
} else {
console.log(signature);
console.log(data, secret);
console.log("Request validation failed");
res.send(JSON.stringify({errors: "Invalid signature"}), 403); // forbidden
}
};
function ksort(obj) {
var i, sorted = {}, keys = Object.keys(obj);
keys.sort();
for (i=0; i<keys.length; i++) sorted[keys[i]] = obj[keys[i]];
return sorted;
}
function urlencode (str) {
// http://kevin.vanzonneveld.net
str = (str + '').toString();
return encodeURIComponent(str)
.replace(/!/g, '%21')
.replace(/'/g, '%27')
.replace(/\(/g, '%28')
.replace(/\)/g, '%29')
.replace(/\*/g, '%2A')
.replace(/%20/g, '+');
}
function http_build_query (formdata, numeric_prefix, arg_separator) {
// http://kevin.vanzonneveld.net
var value, key, tmp = [],
that = this;
var _http_build_query_helper = function (key, val, arg_separator) {
var k, tmp = [];
if (val === true) {
val = "1";
} else if (val === false) {
val = "0";
}
if (val !== null && typeof(val) === "object") {
for (k in val) {
if (val[k] !== null) {
tmp.push(_http_build_query_helper(key + "[" + k + "]", val[k], arg_separator));
}
}
return tmp.join(arg_separator);
} else if (typeof(val) !== "function") {
return urlencode(key) + "=" + urlencode(val);
} else {
throw new Error('There was an error processing for http_build_query().');
}
};
if (!arg_separator) {
arg_separator = "&";
}
for (key in formdata) {
value = formdata[key];
if (numeric_prefix && !isNaN(key)) {
key = String(numeric_prefix) + key;
}
tmp.push(_http_build_query_helper(key, value, arg_separator));
}
return tmp.join(arg_separator);
}
/**
* Issues an http request, and returns the response
* @method _request
* @private
* @param {string} method The http method to use
* @param {string|array} uri The URL to request
* This can also be an array of [url, ip] to send the request
* to a particular IP, while retaining the hostname and request URI
* @param {object|string} [data=''] The associative array of data to add to query
* @param {object} [query=null] The associative array of data to post
* @param {string} [user_agent='Mozilla/5.0'] The user-agent string to send. Defaults to Mozilla.
* @param {object} [header={}] Optional associative array of headers to replace the entire header
* @param [callback=null] {function} Callback receives error and result string as arguments
*/
function _request(method, uri, data /* '' */, query /* null */, user_agent /* Mosilla */, header /* auto */, callback ) {
var that = this;
var agent = 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.9) Gecko/20071025 Firefox/2.0.0.9';
method = method.toLowerCase();
if (typeof data === "function") {
callback = data;
data = '';
query = null;
user_agent = agent;
} else if (typeof query === "function") {
callback = query;
query = null;
user_agent = agent;
} else if (typeof user_agent === "function") {
callback = user_agent;
user_agent = agent;
} else if (typeof header === "function") {
callback = header;
header = null;
}
if (!callback || typeof callback !== "function") return;
var ip = null, url;
if (Q.typeOf(uri) === "array") {
url = uri[0];
if (!!uri[1]) ip = uri[1];
} else url = uri;
var parts = parse_url(url);
var host = parts.host;
if (!ip) ip = host;
var request_uri = parts.path;
var port = parts.port ? ":"+parts.port : '';
var server = parts.scheme+"://"+ip+port+request_uri;
if (!header) header = {
'user-agent': user_agent,
'host': host
};
if (typeof data !== "string") data = http_build_query(data, null, '&');
var request = {
headers: header,
uri: server+"?"+data
};
if (query) request.qs = query;
require('request')[method](request, function (err, res, body) {
if (err) callback.call(that, err);
else {
if (res.statusCode >= 400) callback.call(that, new Error(body));
else callback.call(that, null, body);
}
});
}
/**
* Issues a POST request, and returns the response
* @method post
* @param{string|array} url The URL to post to
* This can also be an array of [url, ip] to send the request
* to a particular IP, while retaining the hostname and request URI
* @param {object|string} [data=''] The associative array of data or string to add to query
* @param {array} [query=null] The associative array of data to post
* @param {string} [user_agent='Mozilla/5.0'] The user-agent string to send. Defaults to Mozilla.
* @param {object} [header={}] Optional associative array of headers to replace the entire header
* @param [callback=null] {function} Callback receives error and result string as arguments
*/
Utils.post = function (url, data, query, user_agent, header, callback) {
_request('POST', url, data, query, user_agent, header, callback);
};
/**
* Issues a GET request, and returns the response
* @method get
* @param {string|array} url The URL to get from
* This can also be an array of [url, ip] to send the request
* to a particular IP, while retaining the hostname and request URI
* @param {object|string} [data=''] The associative array of data or string to add to query
* @param {string} [user_agent='Mozilla/5.0'] The user-agent string to send. Defaults to Mozilla.
* @param {object} [header={}] Optional associative array of headers to replace the entire header
* @param [callback=null] {function} Callback receives error and result string as arguments
*/
Utils.get = function (url, data, user_agent, header, callback) {
_request('GET', url, data, null, user_agent, header, callback);
};
/**
* Queries a server externally to the specified handler. Expects json array with
* either ['slots']['data'] or ['error'] fields filled
* @method queryExternal
* @param {string} handler the handler to call
* @param {array} [data={}] Associative array of data of the message to send.
* @param {string|array} [url=null] and url to query. Default to 'Q/web/appRootUrl' config value
* @param {function} [callback=null] Callback receives error and result string as arguments
*/
Utils.queryExternal = function(handler, data /* {} */, url /* null */, callback)
{
var that = this;
if (typeof data === "function") {
callback = data;
data = {};
url = null;
} else if (typeof url === "function") {
callback = url;
url = null;
}
if (!callback || typeof callback !== "function") return;
if (typeof data !== "object") {
callback(new Error("Utils.queryExternal: data has wrong type. Expecting 'object'"));
}
var query = {}, sig = 'Q.'+Q.Config.get(['Q', 'internal', 'sigField'], 'sig');
query['Q.ajax'] = 'json';
query['Q.slotNames'] = 'data';
query[sig] = Utils.sign(Q.extend({}, data, query))[sig];
if (!url && !(url = Q.Config.get(['Q', 'web', 'appRootUrl'], false)))
callback(new Error("Root URL is not defined in Q.Utils.queryExternal"));
var servers = [], tail = "/action.php/"+handler;
if (Q.typeOf(url) === "array") {
servers.push(url[0]+tail);
if (url.length > 1) servers.push(url[1]);
} else {
servers = url+tail;
}
Utils.post(servers, data, query, function (err, res) {
var d;
if (err) callback.call(that, err);
else {
try {
d = JSON.parse(res);
} catch (e) {
callback(e);
return;
}
if (d.errors) {
if (d.errors[0]) callback(new Error(d.errors[0].message));
else callback(new Error("Unknown error reported by 'Utils.post()'"));
} else if (d.slots && d.slots.data) {
callback(null, d.slots.data);
} else {
callback(null, null); // no slot as set but no error either
}
}
});
};
/**
* Sends a query to Node.js internal server and gets the response
* This method shall make communications behind firewal
* @method queryInternal
* @param {string} handler the handler to call
* @param {array} [data={}] Associative array of data of the message to send.
* @param [url=null] {string|array} and url to query. Default to 'Q/nodeInternal' config value
* @param [callback=null] {function} Callback receives error and result string as arguments
*/
Utils.queryInternal = function(handler, data /* {} */, url /* null */, callback)
{
var that = this;
if (typeof data === "function") {
callback = data;
data = {};
url = null;
} else if (typeof url === "function") {
callback = url;
url = null;
}
if (!callback || typeof callback !== "function") return;
if (typeof data !== "object") callback(new Error("'data' has wrong type. Expecting 'object'"));
if (!url) {
var nodeh = Q.Config.get(['Q', 'nodeInternal', 'host'], null),
nodep = Q.Config.get(['Q', 'nodeInternal', 'port'], null), node;
if (!(url = nodep && nodeh ? "http://"+nodeh+":"+nodep : false))
callback(new Error("nodeInternal server is not defined"));
}
var server = [], tail = "/"+handler;
if (Q.typeOf(url) === "array") {
server.push(url[0]+tail);
if (url.length > 1) server.push(url[1]);
} else {
server = url+tail;
}
Utils.post(server, Utils.sign(data), function (err, res) {
var d;
if (err) callback.call(that, err);
else {
try {
d = JSON.parse(res);
} catch (e) {
callback.call(that, e);
return;
}
if (d.errors) callback.call(that, d.errors);
else callback.call(that, null, d.data);
}
});
};
/**
* Sends internal message to Node.js
* @method sendToNode
* @param data {array} Associative array of data of the message to send.
* It should contain the key "Q/method" so Node can decide what to do with the message.
* @param [url=null] {string|array} and url to query. Default to 'Q/nodeInternal' config value and path '/Q/node'
* @throws {Q.Exception} if data is not object or does not contain 'Q/method' field
*/
Utils.sendToNode = function (data, url /* null */) {
if (typeof data !== 'object')
throw new Q.Exception("The message to send to node shall be an object");
if (!data['Q/method'])
throw new Q.Exception("'Q/method' is required in message for sendToNode");
if (!url) {
var nodeh = Q.Config.get(['Q', 'nodeInternal', 'host'], null),
nodep = Q.Config.get(['Q', 'nodeInternal', 'port'], null);
if (!(url = nodep && nodeh ? "http://"+nodeh+":"+nodep+"/Q/node" : false)) return false;
}
require('request').post({
headers: {
'User-Agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.8.1.9) Gecko/20071025 Firefox/2.0.0.9'
},
uri: url+"?"+http_build_query(Utils.sign(data))
});
return true;
};
/**
* Create folder for filename is it does not exists
* Folder is created with 'world' access rights with 'Q/internal/umask' config value applied as umask
* @method preparePath
* @param filename {string} The filename
* @param callback {function} Receiver errors if any
*/
Utils.preparePath = function(filename, callback) {
var dir = path.dirname(filename.replace('/', Q.DS));
if (!callback || typeof callback !== "function") return;
fs.stat(dir, function (err, stats) {
if (err && err.code !== 'ENOENT') callback(err);
else {
if (err) {
// dir does not exists
Utils.preparePath(dir, function (err) {
if (err) callback(err);
else {
// created path up to dirname(dir)
var mask = process.umask(parseInt(Q.Config.get(['Q', 'internal', 'umask'], "0000"), 8));
fs.mkdir(dir, function (err) {
process.umask(mask);
callback(err);
});
}
});
} else {
// dir exists
if (stats.isDirectory()) callback();
else callback(new Error("'"+dir+"' is not a directory"));
}
}
});
};
// wheather to write log and which log
var _logging = false;
// Connection name
var _connection = null;
// Table name
var _table = null;
// table name with db and prefix
var _dbTable = null;
// the class for which shard is being split
var _class = null;
// the shard name which is being split
var _shard = null;
// the new shards config
var _shards = null;
// the partition which is being split
var _part = null;
// config for new shards
var _parts = null;
// select criteria
var _where = null;
// log file streams
var _log = [];
// log file name
var _log_file = null;
// we'll monitor closing of files for clean up
var _log_pipe = null;
// config reload timeout
var _timeout = 0;
// we give 0.5 sec to check if the file can be created
var _fileTimeout = 500;
// create new Db_Mysql object to leverage caching
var _dbm = null;
// the class construactor
var _rowClass = null;
// timestamp of select query
var _timestamp = null;
function _setTimeout(message, action, timeout) {
var time = Math.floor(timeout/1000);
function _counter() {
process.stderr.write("Wait "+(time--)+" sec to "+message+" \r");
if (time > 0) return setTimeout(_counter, 1000);
else return null;
}
return [setTimeout(action, timeout), _counter()];
}
function _clearTimeout(timeout) {
clearTimeout(timeout[0]);
clearTimeout(timeout[1]);
}
function _reset_split() {
// close split process and update config
_split_log("Resetting shard split handler and configuration. Please, wait for final confirmation");
_connection = _table = _dbTable = _class = _shard = _shards = _part = _parts = _where = _log_file = _dbm = _rowClass = _timestamp = null;
_logging = false; _phase = 0;
var config = Q.Config.get(['Q', 'internal', 'sharding', 'upcoming'], 'Db/config/upcoming.json');
Q.Config.clearOnServer(config, function (err) {
if (err) _split_log("Failed to clear config in file '"+config+"'. Delete the file manually to avoid excessive messaging.", err);
else {
_setTimeout("reload updated config", function () {
_timeout = 0;
_split_log("Shards split handler is reset and can handle new request");
_split_log("The file '"+config+"' can be safely deleted");
}, _timeout);
}
});
}
function _split_log() {
// may be modified to write log to file
console.log.apply(this, arguments);
}
var _logServer = null;
Utils.listen = function(callback) {
// Start internal server
var server = Q.listen();
server.attahed.express.post('/Db/Shards', function Shards_split_handler (req, res, next) {
var parsed = req.body;
if (!parsed || !parsed['Q/method']) return next();
switch (parsed['Q/method']) {
case 'split':
if (Q.Config.get(['Db', 'upcoming', _connection, 'shard'], null) === null) {
Q.time("Db/Shards/split");
_log_pipe = new Q.Pipe((function () {
var i, l = [];
for (i=1; i<Q.Config.get(['Q', 'internal', 'sharding', 'iterations'], 1); i++) l.push(i);
return l;
})(), function () {
_split_log("All logs processed");
_log_pipe = null;
_log = [];
});
_connection = parsed.connection;
_table = parsed.table;
_dbTable = parsed.dbTable;
_class = parsed['class'];
_shard = parsed.shard; // may be '' at initial split
_shards = JSON.parse(parsed.shards);
_part = parsed.part;
_parts = JSON.parse(parsed.parts);
_where = parsed.where;
_log = []; // array of log file handlers according to the phases
_dbm = new Db_Mysql(_connection);
// _timeout shall be at least php timeout plus config reload timeout
// to make sure all processes have new config
_timeout = 1000 * (Q.Config.get(['Q', 'internal', 'configServer', 'interval'], 60) + Q.Config.get(['Q', 'internal', 'phpTimeout'], 30));
try {
_rowClass = Q.require(_class.split('_').join('/'));
} catch (e) {
_split_log("Wrong row class supplied '%s', aborting", _class);
_reset_split();
res.send({data: false});
break;
}
// let's check supplied data
if (!(_connection && _table && _dbTable && _shards && _class && _part && _parts && _where)) {
_split_log("Insufficient data supplied for shard split, aborting");
_reset_split();
res.send({data: false});
break;
} else res.send({data: true});
_logServer = [
"http://" + req.info.host+":"+req.info.port+"/Q/node",
server.address().address
];
// write 'upcoming.json'
Q.Config.setOnServer(
Q.Config.get(['Q', 'internal', 'sharding', 'upcoming'], 'Db/config/upcoming.json'),
(new Q.Tree())
.set(['Db', 'connections', _connection, 'indexes', _table], {})
.set(['Db', 'upcoming', _connection], {shard: _shard, table: _table, dbTable: _dbTable})
.set(['Db', 'upcoming', _connection, 'indexes', _table], _parts)
.set(['Db', 'internal', 'sharding', 'logServer'], _logServer),
function (err) {
if (err) {
_split_log("Failed to write '%s'", Q.Config.get(['Q', 'internal', 'sharding', 'upcoming'], 'Db/config/upcoming.json'));
_reset_split();
} else {
// Now 'upcoming' file is ready and after config update we are ready to proceed
// give some time for createWriteStream to check file and then send true
_log_file = Q.Config.get(['Q', 'internal', 'sharding', 'logs'], 'files'+Q.DS+'Db'+Q.DS+'logs')+ // in 'files/DB/logs' dir
Q.DS+'split_'+_connection+'_'+_table+'_'+_shard; // with name 'split_CONNECTION_TABLE_SHARD', later add '_phase_PHASE.log'
Utils.preparePath(_log_file, function (err) {
if (err) {
_split_log("Failed to create directory for logs:", err.message);
_reset_split();
} else {
_log_file_start(1, function () { // on success
_split_log("Begin split process for class '"+_class+"', shard '"+_shard+"' ("+_part+")");
// wait for config update to start process
_setTimeout("activate upcoming config", _split, _timeout);
});
}
});
}
}, true);
} else res.send({errors: "Split process for class '"+_class+"', shard '"+_shard+"' ("+_part+") is active"});
break;
case 'switch':
// now all log lines are processed,
// indexes contain full set of new indexes for _table
// _shards contain new shards
// we shall update new db config and clear temporary file (unblock writes)
// all processes will start writing to new shards
// this is done by query handler to be able to restart process
res.send({data: true});
var i, shardsFile = null,
baseName = Q.Config.get(['Q', 'internal', 'sharding', 'config'], 'Db/config/shards.json'),
configFiles = Q.Config.get(['Q', 'configFiles'], []),
extName = path.extname(baseName);
// baseName is the name of the file without extension
if ((i = baseName.lastIndexOf(extName)) >= 0)
baseName = baseName.substr(0, i);
if (!baseName.length) {
// who knows how creative user is...
baseName = 'Db/config/shards';
extName = '.json';
}
for (i=0; i<configFiles.length; i++) {
if (configFiles[i].indexOf(baseName) === 0) {
shardsFile = configFiles[i];
break;
}
}
// first create new file for shards config
var newShardsFile = baseName+(new Date()).toISOString().replace(/([\-:]|\.\d{3}z$)/gi, '')+extName;
if (shardsFile) {
Q.Config.getFromServer(shardsFile, function (err, data) {
if (err) {
_split_log("Config file read error ("+shardsFile+").", err.message);
_split_log("NOTE: platform is not writing to shard '"+_shard+"'!!!");
_split_log("Update the config file manually and then delete file '%s'", Q.Config.get(["Q", "internal", "sharding", "upcoming"], 'Db/config/upcoming.json'));
_split_log("New shards:", _shards);
_split_log("New indexes:", _parts);
} else _writeShardsConfig(data);
});
} else _writeShardsConfig({});
function _writeShardsConfig(data) {
// calculate new indexes
// get content of previous connection.indexes and connection.shards
var local = new Q.Tree(data);
// clear indexes config related to currently split table
local.clear(['Db', 'connections', _connection, 'indexes', _table]);
var connection = Q.Config.get(['Db', 'connections', _connection], {});
var indexes = {};
if (!connection.indexes || !connection.indexes[_table] || !Object.keys(connection.indexes[_table]).length) {
// no sharding yet
indexes = _parts;
} else {
// sharding already started
var tmp = connection.indexes[_table].partition;
var fields = connection.indexes[_table].fields;
// tmp may be object or array
// if both are arrays keep array in the config
if (Q.typeOf(tmp) === 'array' && Q.typeOf(_parts.partition) === 'array') {
// remove first point from array to avoid listing shard twice
tmp = tmp.concat(_parts.partition.slice(1));
indexes = {partition: tmp.sort(tmp), fields: fields};
} else {
if (Q.typeOf(tmp) === 'array') {
var o = {};
tmp.forEach(function(val) { o[val] = val; });
tmp = o;
}
// now both are objects
Q.extend(tmp, _parts.partition);
indexes = {partition: ksort(tmp), fields: fields};
}
}
// set up new indexes
local.set(['Db', 'connections', _connection, 'indexes', _table], indexes);
// extend shards with new shards config
connection = local.get(['Db', 'connections', _connection], {});
if (connection.shards) Q.extend(connection.shards, _shards);
else connection.shards = _shards;
// write new file with shards config
Q.Config.setOnServer(newShardsFile, local.getAll(), function (err) {
if (err) {
_split_log("Config file write error ("+newShardsFile+").", err.message);
_split_log("NOTE: platform is not writing to shard '"+_shard+"'!!!");
_split_log("Update the config files manually and then delete file '%s'", Q.Config.get(["Q", "internal", "sharding", "upcoming"], 'Db/config/upcoming.json'));
_split_log("New shards:", _shards);
_split_log("New indexes:", _parts);
} else {
// remove old shards config and upcoming config
Q.Config.clearOnServer(
'Q/config/bootstrap.json',
['Q', 'configFiles',
[
shardsFile,
Q.Config.get([
'Q', 'internal', 'sharding', 'upcoming'
], 'Db/config/upcoming.json')
]
], function(err, tree) {
if (err) {
_split_log("Config file read error (Q/config/bootstrap.json).", err.message);
_split_log("NOTE: platform is not writing to shard '"+_shard+"'!!!");
_split_log("Update the config file manually and then delete file '%s'", Q.Config.get(["Q", "internal", "sharding", "upcoming"], 'Db/config/upcoming.json'));
_split_log("New shards:", _shards);
_split_log("New indexes:", _parts);
} else {
// add new shards config and save
tree = new Q.Tree(tree);
tree.merge({Q: {configFiles: [newShardsFile]}});
Q.Config.setOnServer(
'Q/config/bootstrap.json',
tree.getAll(),
function(err) {
if (err) {
_split_log("Config file write error (Q/config/bootstrap.json).", err.message);
_split_log("NOTE: platform is not writing to shard '"+_shard+"'!!!");
_split_log("Update the config file manually and then delete file '%s'", Q.Config.get(["Q", "internal", "sharding", "upcoming"], 'Db/config/upcoming.json'));
_split_log("New content for 'Q/config/bootstrap.json':", tree.getAll());
} else {
// config was written. Now let's update platform
_split_log("Finished split process for shard '%s' (%s) in %s", _shard, _part, Q.timeEnd("Db/Shards/split"));
_reset_split();
}
}, true); // setConfig 'Q/config/bootstrap.json'
}
}, true); // clearConfig 'Q/config/bootstrap.json'
}
}, true); // setConfig newShardsFile
} // _writeShardsConfig
break;
case 'log':
res.send({data: true}); // in case logging use queryInternal
if (_logging) {
_log[_logging].write(
JSON.stringify({
shards: parsed.shards,
sql: parsed.sql,
timestamp: (new Date()).getTime()
})+'\n',
'utf-8');
}
break;
case 'reset':
if (!splitting) {
res.send({data: false});
break;
}
case 'writeLog':
res.send({data: true});
function _block_error(err, config) {
_split_log("Error updating config.", err.message);
_split_log("Failed block shard '"+_shard+"'. Log file is been written.");
_split_log("Check and fix error, verify if file '"+config+"' exists and contains split information");
_split_log("then run 'split.php --log-process' to continue the process");
}
if (_logging >= Q.Config.get(['Q', 'internal', 'sharding', 'iterations'], 1)) {
// lock table, write last log and switch to new config
// set up Db_Exception_Blocked response while writing log
Q.Config.setOnServer(
Q.Config.get('Q', 'internal', 'sharding', 'upcoming', 'Db/config/upcoming.json'),
(new Q.Tree()).set(['Db', 'upcoming', _connection, 'block'], true),
function (err) {
if (err) _block_error(err, config);
else {
// now we are ready to write last log
// need to wait for php timeout
_setTimeout("block writing to shard '"+_shard+"'", function(phase) {
phase = _logging;
_dump_log(phase, function () {
Utils.queryInternal('Db/Shards', {'Q/method': 'switch'}, function(err) {
if (err) {
_split_log("Failed to change config files.", err.message);
_split_log("Check and fix error, then run 'split.php --reconfigure' to continue the process");
}
}, _logServer);
});
}, _timeout);
}
}); // Utils.setConfig
} else {
// make next log file and start writing it
// process file for current phase and start processing the next
_log_file_start(_logging + 1, function() {
_dump_log(_logging++, function () {
Utils.queryInternal('Db/Shards', {'Q/method': 'writeLog'}, function(err) {
if (err) {
_split_log("Failed to start writion log.", err.message);
_split_log("Check and fix error, then run 'split.php --log-process' to continue the process");
}
}, _logServer);
});
});
}
break;
default:
return next();
} // switch (parsed['Q/method'])
}); // server.attached.express.post query
server.attached.express.post('/Q/node', function Shards_split_logger(req, res, next) {
var parsed = req.body;
if (!parsed || !parsed['Q/method']) return next();
switch (parsed['Q/method']) {
case 'Db/Shards/log': // loose logging with sendToNode
if (_logging) {
_log[_logging].write(JSON.stringify({
shards: parsed.shards,
sql: parsed.sql,
timestamp: (new Date()).getTime()}
)+'\n', 'utf-8');
}
break;
default:
return next();
}
}); // server.attached.express.post sendToNode
if (server.address()) callback && callback();
else server.once('listening', function () {
callback && callback(server.address());
});
};
// actually make the split
function _split() {
if (Q.Config.get(['Db', 'upcoming', _connection, 'shard'], null) === null) {
_split_log("Splitting cancelled!");
return;
}
_split_log("Start copying old shard '"+_shard+"' ("+_part+")");
var total = 0, read = 0, count = 0, shards = Object.keys(_shards);
var batches = {};
shards.forEach(function(shard) {
batches[shard] = Q.batcher(function(rows, callback) {
// insert ['row1', 'row2', ...] to 'shard'
if (rows.length) {
_dbm.reallyConnect(function (client) {
var i, s = [];
function _escapeRow(row) {
var key, v = [];
for (key in row) v.push(client.escape(row[key]));
return "("+v.join(", ")+")";
}
for (i=0; i<rows.length; i++) {
s.push(_escapeRow(rows[i]));
}
var sql = "INSERT INTO "+_rowClass.table().toString()
.replace('{$prefix}', _dbm.prefix())
.replace('{$dbname}', _dbm.dbname())
+" ("+Object.keys(rows[0]).join(", ")+") VALUES "+s.join(", ");
client.query(sql, function(err) {
process.stderr.write("Processed "+(count/total*100).toFixed(1)+"%\r");
callback([err]);
});
}, shard, _shards[shard]);
} else callback();
}, {ms: 50, max: 100}); // explicit batch options
});
_logging = 1;
var child = require('child_process').fork(
Q.CLASSES_DIR+'/Q/Utils/Split.js',
[Q.app.DIR, _class, _connection, _dbTable, _shard, _part, JSON.stringify(_parts), _where],
{cwd: Q.CLASSES_DIR, env: process.env}
).once('exit', function(code, signal) {
switch (code) {
case 0:
child = null;
return;
case 99:
break;
default:
if (signal) _split_log("Child process died unexpectedly on signal '%s'", signal);
else _split_log("Child process died unexpectedly with code %d", code);
}
_split_log("Split process for '"+_shard+"' ("+_part+") failed!");
child = batches = null;
_reset_split();
}) // on 'exit'
.on('message', function (message) {
var fail = false;
if (!message.type) throw new Error("Message type is not defined");
switch (message.type) {
case 'start':
Q.time("Db/Shards/copy");
total = message.count;
_timestamp = message.timestamp;
break;
case 'log':
_split_log.apply(this, message.content);
break;
case 'row':
batches[message.shard](message.row, function (err) {
count++;
if (err) {
if (fail) return;
fail = true;
child.removeAllListeners('message');
child.removeAllListeners('exit');
for (var shard in batches) batches[shard].cancel();
child.kill();
batches = null;
_split_log("Error processing rows of table '"+_dbTable+"'.", err.message);
_split_log("Split process for '"+_shard+"' ("+_part+") failed!");
_reset_split();
} else if (count === read) {
_split_log("Total "+count+" rows from shard '"+_shard
+"' ("+_part+") processed in "+Q.timeEnd("Db/Shards/copy"));
Utils.queryInternal('Db/Shards', {'Q/method': 'writeLog'}, function(err) {
if (err) {
_split_log("Failed to start writing log.", err.message);
_split_log("Check and fix error, then run 'split.php --log-process' to continue the process");
}
}, _logServer);
}
});
break;
case 'stop':
read = message.count;
if (read <= count) {
for (var shard in batches) batches[shard].cancel();
_split_log("All rows processed before read finished. Exiting.");
child.kill();
batches = null;
_reset_split();
}
break;
default:
throw new Error("Message of type '"+message.type+"' is not supported");
}
}); // on 'message'
}
function _log_file_start(phase, cb) {
var _t = setTimeout(function() {
_split_log("Start writing log file '"+_log_file+"' phase "+phase);
_log[phase].write('# start\n');
cb && cb();
}, _fileTimeout);
_log[phase] = require('fs')
.createWriteStream(Q.app.DIR+Q.DS+_log_file+'_phase_'+phase+'.log') // relative to application dir
.on('error', function(err) {
// if log file error occur at any moment we consider split process broken
// if log file cannot be created we shall clear timeouts to stop process
_split_log("Log file error ("+_log_file+", phase "+phase+").", err.message);
_clearTimeout(_t);
_reset_split(); // if log file cannot be written the whole process fails
}).on('close', _log_pipe.fill(this.phase));
_log[phase].phase = phase;
}
var _buffer = '';
function _dump_log (phase, onsuccess) {
_log[phase].end('# end');
_split_log("Start processing log file '"+_log_file+"' phase "+phase);
var log = require('fs')
.createReadStream(Q.app.DIR+Q.DS+_log_file+'_phase_'+phase+'.log')
.on('error', function (err) {
_split_log("Log file read error ("+_log_file+").", err.message);
_split_log("Check and fix error, verify if file '"+_log_file+"' exists and contains log information");
_split_log("then run 'split.php --log-process' to continue the process");
this.removeAllListeners();
}).on('data', function(data) {
var lines = (_buffer + data).split("\n"), that = this, failed = false;
_buffer = lines.pop();
lines.forEach(function(line, obj) {
if (failed) return;
line = line.replace("\r", '');
// here line contains the line from log file
if (line[0] !== '#') {
try {
obj = JSON.parse(line);
} catch (e) {
// NOTE: this may leave some file handles open
_split_log("Error parsing log file'"+_log_file+"' phase "+phase, e);
_split_log("Split process for '"+_shard+"' ("+_part+") failed!");
that.removeAllListeners();
_reset_split();
failed = true;
return;
}
if (_timestamp && obj.timestamp >= _timestamp) {
var i, shard;
for (i=0; i<obj.shards.length; i++) {
shard = obj.shards[i];
_dbm.reallyConnect(function(client) {
var sql = obj.sql
.replace('{$prefix}', _dbm.prefix())
.replace('{$dbname}', _dbm.dbname());
client.query(sql, function(err) {
if (failed) return;
if (err) {
failed = true;
_split_log("Error writing from log file.", err.message);
_split_log("Split process for '"+_shard+"' ("+_part+") failed!");
that.removeAllListeners();
_reset_split();
}
});
}, shard, _shards[shard]);
}
}
}
});
}) // on data
.on('end', function () {
log = null;
_split_log("Log for phase "+phase+" has been processed");
onsuccess && onsuccess();
}); // on 'end'
}
/**
* Used to split ids into one or more segments, in order to store millions
* of files under a directory, without running into limits of various filesystems
* on the number of files in a directory.
* Consider using Amazon S3 or another service for uploading files in production.
* @method splitId
* @static
* @param {string} id the id to split
* @param {integer} [lengths=3] the lengths of each segment (the last one can be smaller)
* @param {string} [delimiter=path.sep] the delimiter to put between segments
* @param {string} [internalDelimiter='/'] the internal delimiter, if it is set then only the last part is split, and instances of internalDelimiter are replaced by delimiter
* @return {string} the segments, delimited by the delimiter
*/
Utils.splitId = function(id, lengths, delimiter, internalDelimiter) {
lengths = lengths || 3;
delimiter = delimiter || path.sep;
if (internalDelimiter === undefined) {
internalDelimiter = '/';
}
var prefix = '';
var parts = [];
if (internalDelimiter) {
parts = id.split(internalDelimiter);
id = parts.pop();
}
var prefix = parts.length > 0
? parts.join(delimiter) + delimiter
: '';
var segments = [];
var pos = 0;
var len = id.length;
while (pos < len) {
segments.push(id.slice(pos, pos += lengths));
}
return prefix + segments.join(delimiter);
};
module.exports = Utils;