Show:

File: platform/classes/Db/Mysql.js

/**
 * @module Db
 */
var Q = require('Q');
var Db = Q.require('Db');
var util = require('util');
	
/**
 * MySQL connection class
 * @class Mysql
 * @namespace Db
 * @constructor
 * @param {String} connName The name of connection
 * @param {String} dsn The DSN string to make connection
 * @throws {Q.Exception} If database connection is not registered with Db module
 */
function Db_Mysql(connName, dsn) {
	
	/**
	 * Connection information
	 * @property info
	 * @type object
	 * @private
	 */
	var info = Db.getConnection(connName);
	if (!info) {
		throw new Q.Exception("Database connection \""+connName+"\" wasn't registered with Db");
	}
	if (!dsn) {
		dsn = Db.parseDsnString(info['dsn']);
	}
	
	var dbm = this;
	/**
	 * The name of connection
	 * @property connName
	 * @type string
	 */
	dbm.connName = connName;
	/**
	 * The connection created with mysql.createConnection()
	 * @property connection
	 * @type mysql.Connection
	 * @default null
	 */
	dbm.connection = null;
	/**
	 * Wheather connection is connected to database
	 * @property connected
	 * @type boolean
	 * @default false
	 */
	dbm.connected = false;
	
	/**
	 * Cache of connections
	 * @property connections
	 * @type object
	 * @default {}
	 * @private
	 */
	var connections = {};

	function mysqlConnection(host, port, user, password, database, options, ignoreExisting) {
		var key = [host, port, user, password, database].join("\t");
		if (!ignoreExisting && connections[key]) {
			return connections[key];
		}
		var o = Q.extend({
			host: host,
			port: port,
			user: user,
			password: password,
			database: database,
			multipleStatements: true
		}, options);
		return connections[key] = require('mysql').createConnection(o);
	}

	/**
	 * Retrieve connection information possibly modified for particular shard
	 * @method info
	 * @param {String} [shardName=''] The name of the shard, defaults to '' - i.e. main table
	 * @param {Object} [modifications={}] Additional modifications to table information. If supplied override shard modifications
	 * @return {Object} Parsed dsn string with possible modifications
	 */
	dbm.info = function(shardName, modifications) {
		modifications = modifications || Db.getShard(this.connName, shardName || '') || {};
		return Q.extend({}, info, dsn, modifications, (modifications['dsn'] ? Db.parseDsnString(modifications['dsn']): {}));
	};

	/**
	 * Create mysql.Connection and connects to the database table
	 * @method reallyConnect
	 * @param {Function} callback The callback is fired after connection is complete. mysql.Connection is passed as argument
	 * @param {String} [shardName=''] The name of the shard to connect
	 * @param {Object} [modifications={}] Additional modifications to table information. If supplied override shard modifications
	 * @param {Boolean} [dontReconnect=false] Pass true here to avoid automatically reconnecting when connection is lost
	 */
	dbm.reallyConnect = function(callback, shardName, modifications, dontReconnect) {
	
		function _setUpConnection() {
			if (Q.Config.get(['Db', 'debug'], false)) {
				connection._original_query = connection.query;
				connection.query = function (sql) {
					Q.log(
						"--> db="+connection.config.database+": " + sql.replace(/[\n\t]+/g, " ") + "\n",
						'mysql'
					);
					return connection._original_query.apply(connection, arguments);
				};
			}
			connection.on('error', function _Db_Mysql_onConnectionError(err, mq) {
				if (err.code === "PROTOCOL_CONNECTION_LOST" && !dontReconnect) {
					connection = mysqlConnection(
						info.host,
						info.port || 3306,
						info.username,
						info.password,
						info.dbname,
						info.options,
						true
					);
					connection.connect(function (err) {
						if (err) {
							throw new Q.Exception("Db.Mysql connection failed to reconnect to " + connection.config.database, 'mysql')
							return;
						}
						Q.log("Db.Mysql reconnected to " + connection.config.database, 'mysql');
					});
					return _setUpConnection();
				}
				Q.log("Db.Mysql error: " + err, 'mysql');
				if (mq) {
					mq.getSQL(function (repres) {
						Q.log("Query was: " + repres, 'mysql');
					});
				}
				if (!Q.Config.expect(['Db', 'survive', 'mysql'])) {
					console.log("Db: MySQL error, see files/Q/logs/mysql_node.log");
					process.exit();
					// our app will survive mysql errors, and continue operating
				}
			});
			var mt = require('moment-timezone');
			var timezone = Q.Config.expect(['Q', 'defaultTimezone']);
			var offset = mt.tz.zone(timezone);
			offset = typeof offset.utcOffset === 'function' ? offset.utcOffset(Date.now()) : offset.offset(Date.now());
		    var dt = new Date(
				Math.abs(offset) * 60000 + new Date(2000, 0).getTime()
		    ).toTimeString();
		    var tz = (offset < 0 ? '-' : '+') + dt.substr(0,2) + ':' + dt.substr(3,2);
			connection.query('SET NAMES UTF8; SET time_zone = "'+tz+'"');
		}
		
		info = this.info(shardName, modifications);
		var connection = mysqlConnection(
			info.host,
			info.port || 3306,
			info.username,
			info.password,
			info.dbname,
			info.options
		);
		if (!dbm.connected) {
			_setUpConnection();
			dbm.connected = true;
		}
		callback && callback(connection);
		return connection;
	};
	/**
	 * Retrieves table prefix to use with connection
	 * @method prefix
	 * @return {string}
	 */
	dbm.prefix = function() {
		return info.prefix;
	};
	
	/**
	 * Retrieves database name to use with connection
	 * @method dbname
	 * @return {string}
	 */
	dbm.dbname = function() {
		return info.dbname;
	};
	
	/**
	 * Creates a raw query.
	 * @method rawQuery
	 * @param {String} query The query string
	 * @param {Object|Array} parameters The parameters to add to the query right away (to be bound when executing). Values corresponding to numeric keys replace question marks, while values corresponding to string keys replace ":key" placeholders, in the SQL.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 */
	dbm.rawQuery = function(query, parameters) {
		query = query.replaceAll({
			'{$prefix}': dbm.prefix()
		});
		return new Db.Query.Mysql(this, Db.Query.TYPE_RAW, {"RAW": query}, parameters);
	};

	/**
	 * Creates a query to rollback a previously started transaction.
	 * @method rollback
	 * @param {array} $criteria The criteria to use, for sharding
	 * @return {Db_Query_Mysql} The resulting Db_Query object
	 */
	dbm.rollback = function(criteria) {
		return new Db.Query.Mysql(this, Db.Query.TYPE_ROLLBACK).rollback(criteria);
	};

	/**
	 * Creates a query to select fields from a table. Needs to be used with Db.Query.from().
	 * @method SELECT
	 * @param {String|Object} fields The fields as strings, or associative array of `{alias: field}` pairs
	 * @param {String|Object} tables The tables as strings, or associative array of `{alias: table}` pairs
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 */
	dbm.SELECT = function(fields, tables) {
		if (!fields)
			throw new Q.Exception("fields not specified in call to 'select'.");
		if (tables === undefined)
			throw new Q.Exception("tables not specified in call to 'select'.");
		var query = new Db.Query.Mysql(this, Db.Query.TYPE_SELECT);
		return query.SELECT(fields, tables);
	};
	
	/**
	 * Creates a query to insert a row into a table
	 * @method INSERT
	 * @param {String} table_into The name of the table to insert into
	 * @param {Object} fields The fields as an associative of `{column: value}` pairs
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 */
	dbm.INSERT = function(table_into, fields) {
		if (!table_into)
			throw new Q.Exception("table not specified in call to 'insert'.");

		// fields might be an empty array,
		// but the insert will still be attempted.

		var columns_list = [];
		var values_list = [];
		for (var column in fields) {
			var value = fields[column];
			columns_list.push(column);
			if (value && value.typename === 'Db.Expression') {
				values_list.push(value.valueOf());
			} else {
				values_list.push(":" + column);
			}
		}
		var columns_string = columns_list.join(', ');
		var values_string = values_list.join(', ');

		var clauses = {
			"INTO": table_into,
			"FIELDS": columns_string,
			"VALUES": values_string
		};

		return new Db.Query.Mysql(this, Db.Query.TYPE_INSERT, clauses, fields, table_into);
	};
	
	/**
	 * Creates a query to update rows. Must be used with Db.Query.set
	 * @method UPDATE
	 * @param {String} table The table to update
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 */
	dbm.UPDATE = function (table) {
		if (!table)
			throw new Q.Exception("table not specified in call to 'update'.");
		var clauses = {"UPDATE": table};
		return new Db.Query.Mysql(this, Db.Query.TYPE_UPDATE, clauses, null, table);
	};

	/**
	 * Creates a query to delete rows.
	 * @method DELETE
	 * @param {String} table_from The table to delete from
	 * @param {String} [table_using=null] If set, adds a USING clause with this table.
	 *  You can then use .join() with the resulting Db_Query.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 */
	dbm.DELETE = function (table_from, table_using) {
		if (!table_from)
			throw new Q.Exception("table not specified in call to 'delete'.");

		var clauses = table_using ? {"FROM": table_from + " USING " + table_using} : {"FROM": table_from};
		return new Db.Query.Mysql(this, Db.Query.TYPE_DELETE, clauses, null, table_from);
	};
	
	/**
	 * Generate an ID that is unique in a table
	 * @method uniqueId
	 * @param {String} table The name of the table
	 * @param {String} field The name of the field to check for uniqueness.
	 *  You should probably have an index starting with this field.
	 * @param {Function} callback When an acceptable unique ID is generated, this function is called with the ID
	 *  as the first parameter.
	 * @param {Object} [where={}] You can indicate conditions here to limit the search for
	 *  an existing value. The result is an id that is unique within
	 *  a certain partition.
	 * @param {Object} [options={}] Optional hash used to override default options:
	 * @param {Number} [options.length=8] The length of the ID to generate, after the prefix.
	 * @param {String} [options.characters='abcdefghijklmnopqrstuvwxyz']  All the characters from which to construct the id
	 * @param {String} [options.prefix=''] The prefix to prepend to the unique id.
	 * @param {Function} [options.filter]
	 *     A function that will take the generated string and check it.
	 *     The filter function can modify the string by returning another string,
	 *     or simply reject the string by returning false, in which case
	 *     another string will be generated.
	 * @param {Function|Q.Event} [options.onError] Triggered if an error occurs
	 */
	dbm.uniqueId = function(table, field, callback, where, options) {
		where = where || {};
		options = options || {};
		var length = options.length || 8;
		var characters = options.characters || 'abcdefghijklmnopqrstuvwxyz';
		var prefix = options.prefix || '';
		var count = characters.length;
		var that = this;
		function attempt() {
			var id = prefix;
			for (var i=0; i<length; ++i) {
				id += characters[Math.floor(Math.random() * count)];
			}
			if (options.filter) {
				var ret = Q.handle(options.filter, this, [id, table, field, where, options]);
				if (ret === false) {
					attempt();
					return;
				} else if (ret) {
					id = ret;
				}
			}
			where[field] = id;
			that.SELECT(field, table).where(where).limit(1)
			.execute(function (err, rows) {
				if (err) {
					Q.handle(options.onError, that, [err]);
				} else if (!rows.length) {
					Q.handle(callback, that, [id]);
				} else {
					attempt(); // generate another id
				}
			});
		}
		attempt();
	};
	
	/**
	 * Returns a timestamp from a Date string
	 * @method fromDate
	 * @param {string} date The Date string that comes from the db
	 * @return {integer} The timestamp
	 */
	dbm.fromDate = function(date) {
		var year = date.substr(0, 4),
		    month = date.substr(5, 2),
		    day = date.substr(8, 2);
		return (new Date(year, month, day).getTime());
	};
    
	/**
	 * Returns a timestamp from a DateTime string
	 * @method fromDateTime
	 * @param {string} datetime The DateTime string that comes from the db
	 * @return {integer} The timestamp
	 */
	dbm.fromDateTime = function(datetime) {
		if (datetime.constructor === Date) {
			return datetime.getTime();
		}
		var year = datetime.substr(0, 4),
		    month = datetime.substr(5, 2),
		    day = datetime.substr(8, 2),
		    hour = datetime.substr(11, 2),
		    min = datetime.substr(14, 2),
		    sec = datetime.substr(17, 2);
		return (new Date(year, month, day, hour, min, sec, 0).getTime());
	};

	/**
	 * Returns a Date string to store in the database
	 * @method toDate
	 * @param {Date|String|integer} input The UNIX timestamp, e.g. from a strtotime function
	 * @return {String} in "yyyy-mm-dd hh:mm:ss" format
	 */
	dbm.toDate = function(input) {
		var date = Date.from(input);
		var year = date.getFullYear();
		var month = date.getMonth();
		var day = date.getDate();
		month = month < 10 ? '0'+month : month;
		day = day < 10 ? '0'+day : day;
		return year + '-' + month + '-' + day;
	};

	/**
	 * Returns a DateTime string to store in the database
	 * @method toDateTime
	 * @param {Date|string|integer} input a standard UNIX timestamp
	 * @return {String} in "yyyy-mm-dd hh:mm:ss" format
	 */
	dbm.toDateTime = function(input) {
		var date = Date.from(input);
		var year = date.getFullYear();
		var month = date.getMonth()+1;
		var day = date.getDate();
		var hours = date.getHours();
		var minutes = date.getMinutes();
		var seconds = date.getSeconds();
		month = month < 10 ? '0'+month : month;
		day = day < 10 ? '0'+day : day;
		hours = hours < 10 ? '0'+hours : hours;
		minutes = minutes < 10 ? '0'+minutes : minutes;
		seconds = seconds < 10 ? '0'+seconds : seconds;
		return year + '-' + month + '-' + day + ' ' + hours + ':' + minutes + ':' + seconds;
	};
	
	var _dbtime = null,
	    _nodetime = null;
	
	/**
	 * Returns the timestamp the db server would have, based on synchronization
	 * @method timestamp
	 * @param {Function} callback receives (err, timestamp)
	 * @return {integer}
	 */
	dbm.getCurrentTimestamp = function (callback) {
		if (_dbtime) {
			return callback(null, _dbtime + Math.round(Date.now() - _nodetime));
		}
		var time1 = Date.now();
		dbm.SELECT('CURRENT_TIMESTAMP ct', '').execute(function (err, rows) {
			if (err) {
				return callback(err);
			}
			if (!rows || !rows[0]) {
				return callback("No results returned");
			}
			_dbtime = dbm.fromDateTime(rows[0].fields.ct);
			var time2 = Date.now();
			_nodetime = (time1 + time2) / 2;
			callback(null, _dbtime + Math.round(time2 - _nodetime));
		});
	};

}

Q.makeEventEmitter(Db_Mysql, true);
module.exports = Db_Mysql;