Show:

File: platform/classes/Q/Utils/Split.js

/**
 * @module Q
 */
/**
 * Extra process for shards split
 * @namespace Q.Utils
 * @class Split
 * @static
 */
var Q = require('Q');

Q.on('init', function () {
	var Db_Mysql = Q.require('Db/Mysql');
	var Utils = Q.require('Q/Utils');
	try {
		var _class = process.argv[3];
		var _connection = process.argv[4];
		var _dbTable = process.argv[5];
		var _shard = process.argv[6];
		var _part = process.argv[7];
		var _parts = JSON.parse(process.argv[8]);
		var _where = process.argv[9];
		var _rowClass = Q.require(_class.split('_').join('/')); // shall be fine as tested by parent process
	} catch (err) {
		process.send({type: 'log', content: ["Wrong arguments supplied to child process.", err.message]});
		process.exit(99);
	}

	var read = 0;
	// now read original table and save to new shards
	(new Db_Mysql(_connection)).reallyConnect(function(client) {
		client.query("SELECT COUNT(*) AS count FROM "+_dbTable+" WHERE "+_where, function (err, res) {
			if (err) {
				process.send({type: 'log', content: ["Could not count records in '%s'.", _dbTable, err.message]});
				client.destroy();
				process.exit(99);
			}
			// load all data from the old table, pass it trough new partition and save to new shards
			process.send({type: 'start', count: res[0].count, timestamp: (new Date()).getTime()});
			client.query("SELECT * FROM "+_dbTable+" WHERE "+_where)
				// when query has no callback only event is emmited on error
				.on('error', function (err) {
					if (err) {
						process.send({type: 'log', content: ["Error processing table '%s'.", _dbTable, err.message]});
						client.destroy();
						process.exit(99);
					}
				}) // on 'error'
				.on('row', function (row) {
					read++;
					var shards = Object.keys((new _rowClass(row)).retrieve('*', true, true).shard(_parts));
					if (shards.length !== 1) {
						process.send({type: 'log', content: ["Could not determine unique shard to save row.\nfound shard(s)\n'%j'\nfor row\n%j", shards, row]});
						client.destroy();
						process.exit(99);
					}
					process.send({type: 'row', shard: shards[0], row: row});
				}) // on 'row'
				.on('end', function () {
					process.send({type: 'stop', count: read});
					client.end(function (err) {
						if (err) process.send({type: 'log', content: ["Error closing database connection.", err.message]});
						process.exit(0);
					});
				}); // on 'end'
		});
	}, _shard); // reallyConnect
});

Q.init({DIR: process.argv[2]}, "Child process to split data to new shards forked");