Show:

File: platform/classes/Db/Query/Mysql.js

/**
 * @module Db
 */

var Q = require('Q');
var Db = Q.require('Db');

var util = require('util');

var _valueCounter = 1;
var _HASH_LEN = 7;

/**
 * Class implements MySQL query
 * @class Mysql
 * @namespace Db.Query
 * @constructor
 * @param mysql Db.Mysql
 * @param {Number} type One of the TYPE_* constants in Db.Query
 * @param {String|Object} query A sql query (for raw queries) or an associative array of clauses
 * @param {Object|Array} parameters The parameters to add to the query right away (to be bound when executing). Values corresponding to numeric keys replace question marks, while values corresponding to string keys replace ":key" placeholders, in the SQL.
 * @param {String} table The table operated with query
 */
var Query_Mysql = function(mysql, type, clauses, parameters, table) {
	Db.Query.apply(this, arguments);
	var mq = this;

	/**
	 * Criteria used for sharding the query
	 * @property criteria
	 * @type object
	 */
	mq.criteria = {};
	/**
	 * The table operated with query
	 * @property table
	 * @type string
	 */
	mq.table = table;

	// and now, for sharding
	mq.parameters = {};
	if (parameters) {
		for (var k in parameters) {
			var p = parameters[k];
			if (p instanceof Db.Expression) {
				Q.extend(mq.parameters, p.parameters);
			} else {
				mq.parameters[k] = p;
			}
		}
	}
	if (typeof parameters === 'object') {
		this.criteria = Q.copy(this.parameters);
	}

	/**
	 * Executes the query against a database connection
	 * Connects to one or more shard(s) as necessary.
	 * @param {function} callback This function is called when the queries have all completed.
	 *  It is passed the following arguments:
	 * * errors: an Object. If there were any errors, it will contain shardName: error pairs
	 * * results: an array of results merged from all the shards (for SELECT queries)
	 *    for INSERT queries results contains the value of LAST_INSERT_ID()
	 * * fields: an array of fields merged from all the shards (for SELECT queries)
	 *  It is passed an object whose keys are the shard names and the values
	 *  are arrays of [err, results, fields] as returned from the mysql module.
	 * @method execute
	 * @param {Object} [options={}] You can override the following options:
	 * @param {boolean} [options.plain=false]
	 *    If true, returns array of plain object instead of Db.Row instances
	 * @param {boolean} [options.raw=false]
	 *    If true, or if the query type is Db.Query.TYPE_RAW, the callback
	 *    will be passed an object of pairs representing the results returned
	 *    from the mysql query on each shard. Note that the results array will
	 *    contain raw objects of the form "{fieldName: fieldValue};",
	 *    and not objects which have Db.Row mixed in.
	 * @param {String|Array|Object} [options.shards]
	 *    This option will bypass the usual sharding calculations.
	 *    You can pass a string here, which will be used to run the query
	 *    on this shard. Or pass an array of shard names. Or you can 
	 *    specify custom query objects as {shardName: query}.
	 */
	mq.execute = function(callback, options) {
		var shardName, connection = this.db.connName;
		options = options || {};
		var shards = options.shards;
		if (typeof shards === 'string') {
			shardName = shards;
			shards = {};
			shards[shardName] = mq;
		} else if (Q.isArrayLike(shards)) {
			var shards2 = {};
			for (var i=0, l=shards.length; i<l; ++i) {
				shards[ shards[i] ] = mq;
			}
			shards = shards2;
		}
		var queries = shards || this.shard(options.indexes);
		var self = this;
		if (queries["*"]) {
			var shardNames = Q.Config.get(['Db', 'connections', connection, 'shards'], {'': ''});
			var q = queries["*"];
			for (shardName in shardNames) {
				queries[shardName] = q;
			}
			delete queries['*'];
		}
		var p = new Q.Pipe(Object.keys(queries), function (params, subjects) {
			// all the queries have completed
			// report the results in an object whose keys are the shard names
			// and whose values are arrays of the form [err, rows, fields]
			if (options.raw || mq.type === Db.Query.TYPE_RAW) {
				callback && callback(params);
				return;
			}

			var temp={}, k, res = 0;
			if (mq.type !== Db.Query.TYPE_SELECT) {
				for (k in params) {
					if (params[k][0])
						temp[k] = params[k][0];
					else if (mq.type === Db.Query.TYPE_INSERT)
						res = params[k][1].insertId;
					else if (mq.type === Db.Query.TYPE_DELETE)
						res += params[k][1].affectedRows;
				}
				if (callback) {
					if (Object.keys(temp).length) callback(temp, null);
					else callback(null, mq.type === Db.Query.TYPE_INSERT && !res ? null : res);
				}
				return;
			}
			
			var err={}, results=[], results2=[], rowClass;
			var i, pk, pk2, f;
			for (k in params) {
				pk = params[k];
				if (pk[0]) {
					err[k] = pk[0];
				}
				if (pk[1]) {
					for (i=0; i<pk[1].length; ++i) {
						results.push(pk[1][i]);
					}
				}
				if (pk2 = pk[2]) {
					for (f in pk2) {
						temp[pk2[f].name] = 1;
					}
				}
			}
			var fields = Object.keys(temp);
			if (self.className) {
				rowClass = Q.require( self.className.split('_').join('/') );
				for (i=0; i<results.length; ++i) {
					if (options.plain) {
						results2.push(results[i]);
					} else {
						var row = rowClass.newRow
							? rowClass.newRow(results[i], true)
							: new rowClass(results[i], true)
						results2.push(row);
					}
				}
			} else {
				for (i=0; i<results.length; ++i) {
					results2.push({ fields: results[i] });
				}
			}
			callback && callback(Object.keys(err).length ? err : null, results2, fields);
		});
		var upcoming = Q.Config.get(['Db', 'upcoming', connection], false);

		for (shardName in queries) {
			if (upcoming && queries[shardName].type !== Db.Query.TYPE_SELECT && queries[shardName].type !== Db.Query.TYPE_RAW) {
				if (upcoming.block && shardName === upcoming.shard) {
					var err = new Q.Exception("Shard '"+shardName+"' for connection '"+connection+"' is temporary blocked for writing");
					/**
					 * Database error
					 * @event error
					 * @param {Error} error
					 *	The error object
					 * @param {Db.Query.Mysql} mq
					 *	Db.Query.Mysql object which caused an error
					 */
					mq.db.emit('error', err, mq);
					cb(err);
					return;
				}
			}
			_queryShard(queries[shardName], shardName, p.fill(shardName));
		}
		function _queryShard (query, shardName, cb) {
			query.getSQL(function (sql, connection) {
				Db.emit('execute', query, sql, connection);
				function _doTheQuery () {
					var a = arguments;
					_queryConnection(query, sql, connection, function (err) {
						if (!sql) return _doTheCallback();
						var t = query, a = arguments;
						if (!err && query.clauses['COMMIT']) {
							connection.query('COMMIT;', _doTheCallback);
						} else {
							_doTheCallback();
						}
						function _doTheCallback() {
							if (!options.indexes) {
								switch (query.type) {
									case Db.Query.TYPE_SELECT:
										// SELECT queries don't need to be logged
									case Db.Query.TYPE_RAW:
										// Raw queries are run on shard '' - i.e. main db only
										// actually, raw query may get here only on initial sharding
										// when sharding has started raw queries are never run on shard
										break;
									default:
										var i, k, table;
										if (upcoming && shardName === upcoming.shard) {
											table = query.table;
											for (k in query.replacements) {
												table = table.replace(new RegExp(k, 'g'), query.replacements[k]);
											}
											if (table !== upcoming.dbTable) break;
											// send query to log
											var sql_template = query.getSQL();
											connection.query("SELECT CURRENT_TIMESTAMP", function (err, res) {
												var date;
												if (err) {
													date = new Date(); // backup solution
													date = date.getFullYear()+'-'+(date.getMonth()+1)+'-'+date.getMonth()+' '+
														date.getHours()+':'+date.getMinutes()+':'+date.getSeconds();
												}
												else date = res['CURRENT_TIMESTAMP'];

												var transaction =
													(query.clauses['COMMIT'] ? 'COMMIT' :
													(query.clauses['BEGIN'] ? 'START TRANSACTION' :
													(query.clauses['ROLLBACK'] ? 'ROLLBACK' : null)));

												if (transaction && transaction !== "COMMIT") {
													Q.Utils.sendToNode({
														'Q/method': 'Db/Shards/log',
														'shards': Object.keys(query.shard(upcoming.indexes[upcoming.table])),
														'sql': transaction+';'
													}, Q.Config.get(['Db', 'internal', 'sharding', 'logServer'], null));
												}

												Q.Utils.sendToNode({
													'Q/method': 'Db/Shards/log',
													'shards': Object.keys(query.shard(upcoming.indexes[upcoming.table])),
													'sql': sql_template.replace('CURRENT_TIMESTAMP', date).replace(/[\n]+/g, ' ').replace(/(^[\s]+|[\s]+$)/g, '')
												}, Q.Config.get(['Db', 'internal', 'sharding', 'logServer'], null));

												if (transaction && transaction === "COMMIT") {
													Q.Utils.sendToNode({
														'Q/method': 'Db/Shards/log',
														'shards': Object.keys(query.shard(upcoming.indexes[upcoming.table])),
														'sql': transaction+';'
													}, Q.Config.get(['Db', 'internal', 'sharding', 'logServer'], null));
												}
											});
										}
										break;
								}
							}
							cb.apply(query, a);
						}
					});
				}
				if (query.clauses['BEGIN']) {
					connection.query('START TRANSACTION;', _doTheQuery);
				} else if (query.clauses['ROLLBACK']) {
					connection.query('ROLLBACK;', _doTheQuery);
				} else {
					_doTheQuery();
				}
			}, shardName);
		}
		function _queryConnection (query, sql, connection, cb) {
			if (!sql) return cb(null);
			Db.emit('query', query, sql, connection);
			if (sql.indexOf('(,') >= 0) {
				debugger;
			}
			connection.query(sql, function(err, rows, fields) {
				if (err) {
					err.message += "\nQuery was:\n"+mq;
					mq.db.emit('error', err, mq);
				}
				cb(err, rows, fields, sql, connection);
			});
		}
	};
	
	/**
	 * Creates a query to select fields from one or more tables.
	 * @method SELECT
	 * @param {string|object} fields The fields as strings, or associative array of {alias: field};
	 * @param {string|object} tables The tables as strings, or associative array of {alias: table};
	 * @param {boolean} [repeat=false] If tables is an array, and select() has
	 *  already been called with the exact table name and alias
	 *  as one of the tables in that array, then
	 *  this table is not appended to the tables list if
	 *  repeat is false. Otherwise it is.
	 *  This is really just for using in your hooks.
	 * @return {Db.Query.Mysql} The resulting query object.
	 * @chainable
	 */
	mq.SELECT = function (fields, tables, repeat) {
		var as = ' '; // was: ' AS ', but now we made it more standard SQL
		var column, alias, fields_list, prev_tables_list;
		var table, table_string, tables_array, prev_tables_array;
		var that = this;
		if (typeof fields === 'object') {
			fields_list = [];
			for (alias in fields) {
				column = fields[alias];
				if (isNaN(alias))
					fields_list.push(column + as + alias);
				else
					fields_list.push(column);
			}
			fields = fields_list.join(', ');
		}
		if (typeof fields !== 'string') {
			throw new Q.Exception("The fields to select need to be specified correctly.");
		}

		this.clauses['SELECT'] = this.clauses['SELECT'] ? this.clauses['SELECT'] + ", " + fields : fields;
		if (!tables) {
			return this;
		}

		function get_table_string(table, alias) {
			var table_string;
			if (table && table.typename === "Db.Expression") {
				// this is a subquery
				table_string = "(" + table + ")";
				Q.extend(that.parameters, table.parameters);
			} else {
				table_string = table.trim();
			}
			if (typeof alias !== "undefined" && alias) {
				table_string += as + alias;
			}
			return table_string;
		}
		
		if (!tables) {
			return this;
		}
		
		tables_array = [];
		switch (Q.typeOf(tables)) {
			case "Db.Expression":
				tables_array.push(get_table_string(tables));
				break;
			case "object":
				prev_tables_array = this.clauses['FROM'] ? this.clauses['FROM'] : [];
				for (alias in tables) {
					table_string = get_table_string(tables[alias], alias);
					if (!repeat && prev_tables_array.indexOf(table_string) >= 0) {
						continue;
					}
					tables_array.push(table_string);
				}
				break;
			case "string":
				tables_array = [tables];
				break;
			case "array":
				tables_array = tables;
				break;
			default:
				throw new Exception("Db.Query.Mysql: tables must be string, array or object");
		}
		this.clauses['FROM'] = this.clauses['FROM'] ? this.clauses['FROM'].concat(tables_array) : tables_array;

		return this;
	};

	/**
	 * Joins another table to use in the query
	 * @method join
	 * @param {string} table The name of the table. May also be "name AS alias".
	 * @param {Db.Expression|object|string} condition The condition to join on. Thus, JOIN table ON (condition)
	 * @param {string} [join_type='INNER'] The string to prepend to JOIN, such as 'INNER', 'LEFT OUTER', etc.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @throws {Q.Exception} If JOIN clause does not belong in this context or the JOIN condition specified incorrectly.
	 * @chainable
	 */
	mq.join = function (table, condition, join_type) {
		if (!join_type) {
			join_type = "INNER";
		}
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
			case Db.Query.TYPE_UPDATE:
				break;
			case Db.Query.TYPE_DELETE:
				if (!this.after['FROM']) break;
			default:
				throw new Q.Exception("the JOIN clause does not belong in this context.");
		}

		var expr, value;
		if (typeof condition === 'object') {
			condition = criteria_internal(this, condition);
		} else if (condition && condition.typename === "Db.Expression") {
			Q.extend(this.parameters, condition.parameters);
			condition = condition.toString();
		}
		if (typeof condition !== "string") {
			throw new Q.Exception("The JOIN condition needs to be specified correctly.");
		}
		
		var join = join_type + " JOIN " + table + " ON (" + condition + ")";
		
		this.clauses['JOIN'] = this.clauses['JOIN'] ? this.clauses['JOIN'] + " \n" + join : join;
		return this;
	};

	/**
	 * Adds a WHERE clause to a query
	 * @method where
	 * @param {Db.Expression|object|string} criteria An associative array of expression: value pairs.
	 *  The values are automatically turned into placeholders to be escaped later.
	 *  They can also be arrays, in which case they are placed into an expression of the form "key IN ('val1', 'val2')"
	 *  Or, this could be a Db.Expression object.
	 * @return {Db.Query.Mysql} The resulting Db.Query
	 * @throws {Q.Exception} If WHERE criteria specified incorrectly
	 * @chainable
	 */
	mq.where = function (criteria) {
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
			case Db.Query.TYPE_UPDATE:
			case Db.Query.TYPE_DELETE:
				break;
			default:
				throw new Q.Exception("The WHERE clause does not belong in this context.");
		}
		
		// and now, for sharding
		if (typeof criteria === 'object') {
			this.criteria = Q.copy(criteria);
		}
		
		var ci = criteria_internal(this, criteria);
		if (typeof ci !== 'string') {
			throw new Q.Exception("The WHERE criteria need to be specified correctly.");
		}
		if (!ci) {
			return this;
		}

		this.clauses['WHERE'] = this.clauses['WHERE'] ? "(" + this.clauses['WHERE'] + ") AND (" + ci + ")" : ci;
			
		return this;
	};

	/**
	 * Adds to the WHERE clause, like this:  " ... AND (x OR y OR z)",
	 * where x, y and z are the arguments to this function.
	 * @method andWhere
	 * @param {Db.Expression|object|string} criteria An associative array of expression: value pairs.
	 *  The values are automatically turned into placeholders to be escaped later.
	 *  They can also be arrays, in which case they are placed into an expression of the form "key IN ('val1', 'val2')"
	 *  Or, this could be a Db.Expression object.
	 * @param {Db.Expression|object|string} or_criteria You can have any number of these, including zero.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.andWhere = function (criteria, or_criteria) {
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
			case Db.Query.TYPE_UPDATE:
			case Db.Query.TYPE_DELETE:
				break;
			default:
				throw new Q.Exception("The WHERE clause does not belong in this context.");
		}

		// and now, for sharding
		if (typeof criteria === 'object') {
			if (!this.criteria) {
				this.criteria = criteria;
			} else if (this.shardIndex()) {
				if (arguments.length > 1) {
					throw new Q.Exception("You can't use OR in your WHERE clause when sharding.");
				}
				Q.extend(this.criteria, criteria);
			}
		}

		var c_arr = [];
		var was_empty = true;
		var c; 
		for (var i = 0; i < arguments.length; ++i ) {
			c = criteria_internal(this, arguments[i]);
			if (typeof c !== 'string') {
				throw new Q.Exception("The WHERE criteria need to be specified correctly");
			}
			c_arr.push(c);
			if (c) {
				was_empty = false;
			}
		}
		if (was_empty) {
			return this;
		}
		
		var new_criteria = "(" + c_arr.join(") OR (") + ")";
		this.clauses["WHERE"] = "(" + this.clauses["WHERE"] + ") AND (" + new_criteria + ")";
		return this;
	};

	/**
	 * Adds to the WHERE clause, like this:  " ... OR (x AND y AND z)",
	 * where x, y and z are the arguments to this function.
	 * @method orWhere
	 * @param {Db.Expression|object|string} criteria An associative array of expression: value pairs.
	 *  The values are automatically turned into placeholders to be escaped later.
	 *  They can also be arrays, in which case they are placed into an expression of the form "key IN ('val1', 'val2')"
	 *  Or, this could be a Db.Expression object.
	 * @param {Db.Expression|object|string} and_criteria
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.orWhere = function (criteria, and_criteria) {
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
			case Db.Query.TYPE_UPDATE:
			case Db.Query.TYPE_DELETE:
				break;
			default:
				throw new Q.Exception("The WHERE clause does not belong in this context.");
		}

		// and now, for sharding
		if (typeof criteria === 'object') {
			if (this.shardIndex() && this.criteria) {
				throw new Exception("You can't use OR in your WHERE clause when sharding.");
			}
		}

		var c_arr = [];
		var was_empty = true;
		var c;
		for (var i = 0; i < arguments.length; ++i ) {
			c = criteria_internal(this, arguments[i]);
			if (typeof c !== 'string') {
				throw new Q.Exception("The WHERE criteria need to be specified correctly");
			}
			c_arr.push(c);
			if (c) {
				was_empty = false;
			}
		}
		if (was_empty) {
			return this;
		}
		
		var new_criteria = "(" + c_arr.join(") AND (") + ")";
		this.clauses["WHERE"] = "(" + this.clauses["WHERE"] + ") OR (" + new_criteria + ")";
		return this;
	};
	
	/* *
	 * This function is specifically for adding criteria to query for sharding purposes.
	 * It doesn't affect the SQL generated for the query.
	 * @method criteria
	 * @param criteria Object An associative array of expression => value pairs.
	 */
	/*mq.criteria = function(criteria) {
		if (typeof criteria === 'object') {
			if (!this.criteria) {
				this.criteria = criteria;
			} else {
				Q.extend(this.criteria, criteria);
			}
		}
	};*/

	/**
	 * Adds a GROUP BY clause to a query
	 * @method groupBy
	 * @param expression {Db.Expression|string} A string or Db.Expression with the expression to group the results by.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.groupBy = function (expression) {
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
				break;
			default:
				throw new Q.Exception("The GROUP BY clause does not belong in this context.");
		}

		if (expression && expression.typename === "Db.Expression") {
			Q.extend(this.parameters, expression.parameters);
			expression = expression.toString();
		}
		if (typeof expression !== 'string') {
			throw new Q.Exception("The GROUP BY expression has to be specified correctly.");
		}
		this.clauses['GROUP BY'] = this.clauses['GROUP BY'] ? this.clauses['GROUP BY'] + ", " + expression : expression;
		return this;
	};
	
	/**
	 * Adds a HAVING clause to a query
	 * @method having
	 * @param {Db.Expression|object|string} criteria An associative array of expression => value pairs.
	 *  The values are automatically escaped using PDO placeholders.
	 *  Or, this could be a Db.Expression object.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.having = function (criteria) {
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
				break;
			default:
				throw new Q.Exception("The  clause does not belong in this context.");
		}

		if (!this.clauses['GROUP BY']) {
			throw new Q.Exception("Don't call having() when you haven't called groupBy() yet");
		}

		var ci = criteria_internal(this, criteria);
		if (typeof ci !== 'string') {
			throw new Q.Exception("The HAVING criteria need to be specified correctly.");
		}

		this.clauses['HAVING'] = this.clauses['HAVING'] ? "(" + this.clauses['HAVING'] + ") AND (" + ci + ")" : ci;

		return this;
	};

	/**
	 * Adds a ORDER BY clause to a query
	 * @method orderBy
	 * @param expression {Db.Expression|string} A string or Db.Expression with the expression to order the results by.
	 * @param [ascending=false] Boolean If false, sorts results as ascending, otherwise descending.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.orderBy = function (expression, ascending) {
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
			case Db.Query.TYPE_UPDATE:
				break;
			default:
				throw new Q.Exception("The ORDER BY clause does not belong in this context.");
		}

		if (expression && expression.typename === "Db.Expression") {
			Q.extend(this.parameters, expression.parameters);
			expression = expression.toString();
		}
		if (typeof expression !== 'string') {
			throw new Q.Exception("The ORDER BY expression has to be specified correctly.");
		}
		if (typeof ascending === 'boolean') {
			expression += ascending ? ' ASC' : ' DESC';
		} else if (typeof ascending === 'string') {
			if (ascending.toUpperCase() == 'DESC') {
				expression += ' DESC';
			} else {
				expression += ' ASC';
			}
		}
		this.clauses['ORDER BY'] = this.clauses['ORDER BY'] ? this.clauses['ORDER BY'] + ", " + expression : expression;
		return this;
	};

	/**
	 * Adds optional LIMIT and OFFSET clauses to the query
	 * @method limit
	 * @param {number} limit A non-negative integer showing how many rows to return
	 * @param {number} [offset=0] A non-negative integer showing what row to start the result set with.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.limit = function(limit, offset) {
		if (limit == null) {
			return this;
		}
		if (isNaN(limit) || limit < 0 || Math.floor(limit) !== limit) {
			throw new Q.Exception("the limit must be a non-negative integer");
		}
		if (offset !== undefined && offset !== null) {
			if (isNaN(offset) || offset < 0 || Math.floor(offset) !== offset) {
				throw new Q.Exception("the offset must be a non-negative integer");
			}
		}
		switch (this.type) {
			case Db.Query.TYPE_SELECT:
				break;
			case Db.Query.TYPE_UPDATE:
			case Db.Query.TYPE_DELETE:
				if (offset !== undefined && offset !== null) {
					throw new Q.Exception("the LIMIT clause cannot have an OFFSET in this context");
				}
				break;
			default:
				throw new Q.Exception("The LIMIT clause does not belong in this context.");
		}

		if (this.clauses['LIMIT'])
			throw new Q.Exception("The LIMIT clause has already been specified.");

		this.clauses['LIMIT'] = "LIMIT " + limit;
		if (offset !== undefined && offset !== null) {
			this.clauses['LIMIT'] += " OFFSET " + offset;
		}

		return this;
	};
	
	/**
	 * Adds a SET clause to an UPDATE statement
	 * @method set
	 * @param {object} updates An associative array of column: value pairs.
	 *  The values are automatically escaped using PDO placeholders.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.set = function (updates) {
		var expression = set_internal(this, updates);
		this.clauses['SET'] = this.clauses['SET'] ? this.clauses['SET'] + ", " + expression : expression;
		return this;
	};
	
	/**
	 * Adds an ON DUPLICATE KEY UPDATE clause to an INSERT statement.
	 * Use only with MySQL.
	 * @method onDuplicateKeyUpdate
	 * @param {object} updates An associative array of {column: value} pairs.
	 *  The values are automatically escaped using PDO placeholders.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.onDuplicateKeyUpdate = function(updates) {
		updates = onDuplicateKeyUpdate_internal(this, updates);
		
		if (!this.clauses['ON DUPLICATE KEY UPDATE']) {
			this.clauses['ON DUPLICATE KEY UPDATE'] = updates; 
		} else {
			this.clauses['ON DUPLICATE KEY UPDATE'] += ", " + updates;
		}
		return this;
	};
	
	/**
	 * Works with SELECT queries to lock the selected rows.
	 * Use only with MySQL.
	 * @method lock
	 * @param {string} [type='FOR UPDATE'] Defaults to 'FOR UPDATE', but can also be 'LOCK IN SHARE MODE'
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.lock = function(type) {
		type = type || 'FOR UPDATE';
		switch (type.toUpperCase()) {
			case 'FOR UPDATE':
			case 'LOCK IN SHARE MODE':
				this.clauses['LOCK'] = type;
				break;
			default:
				throw new Exception("Incorrect type for MySQL lock");
		}
		return this;
	};
	
	/**
	 * Begins a transaction right before executing this query.
	 * The reason this method is part of the query class is because
	 * you often need the "where" clauses to figure out which database to send it to,
	 * if sharding is being used.
	 * @method begin
	 * @param {string} [lockType='FOR UPDATE'] Defaults to 'FOR UPDATE',
	 *   but can also be 'LOCK IN SHARE MODE', 
	 *   or set it to null to avoid adding a "LOCK" clause
	 * @chainable
	 */
	mq.begin = function(lockType)
	{
		if (lockType === undefined || lockType === true) {
			lockType = 'FOR UPDATE';
		}
		if (lockType) {
			this.lock(lockType);
		}
		this.clauses['BEGIN'] = 'START TRANSACTION';
		return this;
	};

	/**
	 * Commits transaction when query is executed
	 * Use only with MySQL.
	 * @method commit
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.commit = function() {
		this.clauses['COMMIT'] = 'COMMIT';
		return this;
	};

	/**
	 * Rolls back transaction when query is executed
	 * Use only with MySQL.
	 * @method rollback
	 * @param {string} [criteria=null] Pass this to target the rollback to the right shard.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.rollback = function(criteria) {
		this.clauses['ROLLBACK'] = 'ROLLBACK';
		// and now, for sharding
		if (typeof criteria === 'object') {
			this.criteria = Q.copy(criteria);
		}
		return this;
	};

	function onDuplicateKeyUpdate_internal (query, updates) {
		if (query.type != Db.Query.TYPE_INSERT) {
			throw new Q.Exception("The ON DUPLICATE KEY UPDATE clause does not belong in this context.");
		}
		
		if (typeof updates === 'object') {
			var updates_list = [], field;
			for (field in updates) {
				var value = updates[field];
				if (value && value.typename === "Db.Expression") {
					Q.extend(query.parameters, value.parameters);
					updates_list.push(field + " = " + value);
				} else {
					updates_list.push(field + " = :_dupUpd_"+onDuplicateKeyUpdate_internal.i);
					query.parameters["_dupUpd_"+onDuplicateKeyUpdate_internal.i] = value;
					++ onDuplicateKeyUpdate_internal.i;
				}
			}
			updates = updates_list.join(", ");
		}
		if (typeof updates !== 'string')
			throw new Q.Exception("The ON DUPLICATE KEY updates need to be specified correctly.");
		
		return updates;
	}
	onDuplicateKeyUpdate_internal.i = 1;
	
	/**
	 * This function provides an easy way to provide additional clauses to the query.
	 * @method options
	 * @param {object} options An associative array of {key: value} pairs, where the key is
	 *  the name of the method to call, and the value is the array of arguments.
	 *  If the value is not an array, it is wrapped in one.
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.options = function(options) {
		if (!options) {
			return this;
		}
		for (var key in options) {
			var value = options[key];
			if (typeof(this[key]) === 'function') {
				if (Q.typeOf(value) !== 'array') {
					value = [value];
				}
				var method = this[key];
				method.apply(this, value);
			}
		}
		return this;
	};

	
	/**
	 * Builds the query from the clauses
	 * @method build
	 * @param {object} [options={}]
	 * @return {string}
	 */
	mq.build = function(options) {
		var sql = '', select, from, join, where, groupBy, having, orderBy, limit, lock,
			into, values, afterValues, onDuplicateKeyUpdate,
			update, set, i;
		switch (this.type) {
			case Db.Query.TYPE_RAW:
				sql = this.clauses['RAW'] || '';
				break;
			case Db.Query.TYPE_SELECT:
				// SELECT
				select = this.clauses['SELECT'] || '*';
				if (this.after['SELECT']) {
					select += " " + this.after['SELECT'];
				}
				// FROM
				from = (this.clauses['FROM'] || []).join(', ');
				// if (!from)
				// 	throw new Q.Exception("missing FROM clause in DB query.");
				if (this.after['FROM']) {
					from += " " + this.after['FROM'];
				}
				// JOIN
				join = this.clauses['JOIN'] || '';
				if (this.after['JOIN']) {
					join += " " + this.after['JOIN'];
				}
				// WHERE
				where = this.clauses['WHERE'] ? 'WHERE ' + this.clauses['WHERE'] : '';
				if (this.after['WHERE']) {
					where += " " + this.after['WHERE'];
				}
				// GROUP BY
				groupBy = this.clauses['GROUP BY'] ? "GROUP BY " + this.clauses['GROUP BY'] : '';
				if (this.after['GROUP BY']) {
					groupBy += " " + this.after['GROUP BY'];
				}
				// HAVING
				having = this.clauses['HAVING'] ? "HAVING " + this.clauses['HAVING'] : '';
				if (this.after['HAVING']) {
					having += " " + this.after['HAVING'];
				}
				// ORDER BY
				orderBy = this.clauses['ORDER BY'] ? "ORDER BY " + this.clauses['ORDER BY'] : '';
				if (this.after['ORDER BY']) {
					orderBy += " " + this.after['ORDER BY'];
				}
				// LIMIT
				limit = this.clauses['LIMIT'] || '';
				if (this.after['LIMIT']) {
					limit += " " + this.after['LIMIT'];
				}
				// LOCK
				lock = this.clauses['LOCK'] || '';
				if (this.after['LOCK']) {
					lock +=  " " + this.after['LOCK'];
				}
				sql = "SELECT " + select +
					(from ? "\nFROM " + from : '') +
					"\n" + join +
					"\n" + where +
					"\n" + groupBy +
					"\n" + having +
					"\n" + orderBy +
					"\n" + limit +
					"\n" + lock;
				break;
			case Db.Query.TYPE_INSERT:
				// INTO
				if (!this.clauses['INTO'])
					throw new Q.Exception("missing INTO clause in DB query.");
				into = this.clauses['INTO'] || '';
				if (into) {
					if (!this.clauses['FIELDS']) {
						throw new Q.Exception("missing FIELDS clause in DB query.");
					}
					into += '(' + this.clauses['FIELDS'] + ')';
				}
				if (this.after['INTO']) {
					into += " " + this.after['INTO'];
				}
				values = this.clauses['VALUES'] || '';
				afterValues = this.after['VALUES'] || '';
				onDuplicateKeyUpdate = this.clauses['ON DUPLICATE KEY UPDATE'] ?
					'ON DUPLICATE KEY UPDATE '  + this.clauses['ON DUPLICATE KEY UPDATE'] : '';
				sql = "INSERT INTO " + into +
					"\nVALUES (" + values + ")" +
					"\n" + afterValues +
					"\n" + onDuplicateKeyUpdate;
				break;
			case Db.Query.TYPE_UPDATE:
				// UPDATE
				if (!this.clauses['UPDATE'])
					throw new Q.Exception("Missing UPDATE tables clause in DB query.");
				if (!this.clauses['SET'])
					throw new Q.Exception("missing SET clause in DB query.");
				update = this.clauses['UPDATE'] || '';
				if (this.after['UPDATE']) {
					update += " " + this.after['UPDATE'];
				}
				// JOIN
				join = this.clauses['JOIN'] || '';
				if (this.after['JOIN']) {
					join += " " + this.after['JOIN'];
				}
				// SET
				set = this.clauses['SET'] || '';
				if (this.after['SET']) {
					set += " " + this.after['SET'];
				}
				// WHERE
				where = this.clauses['WHERE'] ? 'WHERE ' + this.clauses['WHERE'] : 'WHERE 1';
				if (this.after['WHERE']) {
					where += " " + this.after['WHERE'];
				}
				// LIMIT
				limit = this.clauses['LIMIT'] || '';
				if (this.after['LIMIT']) {
					limit += " " + this.after['LIMIT'];
				}
				sql = "UPDATE " + update +
					"\n" + join +
					"\nSET " + set +
					"\n" + where +
					"\n" + limit;
				break;
			case Db.Query.TYPE_DELETE:
				// DELETE
				if (!this.clauses['FROM'])
					throw new Q.Exception("missing FROM clause in DB query.");
				from = this.clauses['FROM'] || '';
				if (this.after['FROM']) {
					from += " " + this.after['FROM'];
				}
				// JOIN
				join = this.clauses['JOIN'] || '';
				if (this.after['JOIN']) {
					join += " " + this.after['JOIN'];
				}
				// WHERE
				where = this.clauses['WHERE'] ? 'WHERE ' + this.clauses['WHERE'] : 'WHERE 1';
				if (this.after['WHERE']) {
					where += " " + this.after['WHERE'];
				}
				// LIMIT
				limit = this.clauses['LIMIT'] || '';
				if (this.after['LIMIT']) {
					limit += " " + this.after['LIMIT'];
				}
				sql = "DELETE FROM " + from +
					"\n" + join +
					"\n" + where +
					"\n" + limit;
				break;
			case Db.Query.TYPE_ROLLBACK:
				break;
			default:
				throw new Q.Exception("Unknown query type "+this.type);
				break;
		}
		return sql;
	};
	
	/**
	 * Create mysql.Connection and connects to the database table
	 * @method reallyConnect
	 * @param {function} callback The callback is fired after connection is complete. mysql.Connection is passed as argument
	 * @param {string} [shardName=''] The name of the shard to connect
	 * @param {object} modifications={} Additional modifications to table information. If supplied override shard modifications
	 */
	mq.reallyConnect = function(callback, shardName, modifications) {
		return this.db.reallyConnect(callback, shardName, modifications);
	};

	/**
	 * Get string representation of the query
	 * @method valueOf
	 * @return {string} SQL statement
	 */
	/**
	 * Get string representation of the query
	 * @method toString
	 * @return {string} SQL statement
	 */
	mq.valueOf = mq.toString = function() {
		try {
			return this.build();
		} catch (e) {
			return '*****' + e;
		}
	};
	
	/**
	 * Inserts a custom clause after a particular clause
	 * @method after
	 * @param {string} after The name of the standard clause to add after, such as FROM or UPDATE
	 * @param {string} clause The text of the clause to add
	 * @return {Db.Query.Mysql} The resulting Db.Query object
	 * @chainable
	 */
	mq.after = function(after, clause) {
		if (clause) {
			this.after = this.after[after] ? this.after + ' ' + clause : clause;
		}
		return this;
	};
	
	/**
	 * Gets a clause from the query
	 * @method getClause
	 * @param {string} clause Name of the clause
	 * @param {boolean} with_after Also get the sql after the clause, if any
	 * @return {string|array} If with_after is true, returns [clause, after]
	 *  Otherwise just returns clause
	 */
	mq.getClause = function(clause_name, with_after) {
		var clause = this.clauses[clause_name] || '';
		if (!with_after) {
			return clause;
		}
		var after = this.after[clause_name] || '';
		return [clause, after];
	};

	/**
	 * Gets the SQL that would be executed with the execute() method.
	 * @method getSQL
	 * @param {function} callback This callback is passed the resulting SQL string.
	 * @param {string} [shardName=''] The name of the shard on which to execute getSQL.
	 * @return {Db.Query|string} Returns the db query again, for chainable interface.
	 *	If "callback" is not defined returns string representation of the query
	 */
	mq.getSQL = function (callback, shardName) {
		var mq = this;
		delete mq.replacements['{\\$prefix}'];
		delete mq.replacements['{\\$dbname}'];
		var repres = mq.build();
		var keys = Object.keys(mq.parameters);
		keys.sort(replaceKeysCompare);
		function makeSQL(connection) {
			var k, key, value, value2, values3 = {};
			for (k in keys) {
				key = keys[k];
				value = mq.parameters[key];
				if (value instanceof Buffer) {
					value = value.toString();
				}
				if (value === null || value === undefined) {
					value2 = "NULL";
				} else if (value && value.typename === "Db.Expression") {
					value2 = value;
				} else if (value instanceof Date) {
					value2 = '"'+mq.db.toDateTime(value.getTime())+'"';
				} else {
					value2 = connection.escape(value);
				}
				if (Q.isInteger(key)) {
					values3[key] = value2;
				} else {
					repres = repres.replace(":"+key, value2);
				}
			}
			var i = 0;
			if (!Q.isEmpty(values3)) {
				repres = repres.replace( /\?/g, function() {
					var v = values3[i++];
					if (v === undefined) {
						console.log(repres, i);
					}
					return v !== undefined ? v : '?';
				});
			}
			if (callback)
				Q.extend(mq.replacements, {'{\\$prefix}': mq.db.prefix(), '{\\$dbname}': mq.db.dbname()});
			for (k in mq.replacements) {
				repres = repres.replace(new RegExp(k, 'g'), mq.replacements[k]);
			}
			if (callback) {
				callback.call(mq, repres, connection);
			}
		}
		if (callback) {
			this.reallyConnect(makeSQL, shardName);
			return this;
		} else {
			makeSQL(this.reallyConnect());
			return repres;
		}
	};
};

function replaceKeysCompare(a, b) {
	var aIsInteger = Q.isInteger(a);
	var bIsInteger = Q.isInteger(b);
	if (aIsInteger && !bIsInteger) {
		return -1;
	}
	if (bIsInteger && !aIsInteger) {
		return 1;
	}
	if (aIsInteger && bIsInteger) {
		return parseInt(a) - parseInt(b);
	}
	return b.length-a.length;
}

function criteria_internal (query, criteria) {
	var criteria_list, expr, parts, columns, value, values, v, i, j, k, vl, vl2, pl;
	if (typeof criteria === 'object') {
		criteria_list = [];
		for (expr in criteria) {
			value = criteria[expr];
			if (value instanceof Buffer) {
				value = value.toString();
			}
			parts = expr.split(',').map(function (str) {
				return str.trim();
			});
			pl = parts.length;
			if (pl > 1) {
				if (!Q.isArrayLike(value)) {
					throw new Q.Exception("Db.Query.Mysql: The value should be an array of arrays");
				}
				var columns = [];
				for (k=0; k<pl; ++k) {
					var column = parts[k];
					columns.push(column);
					if (!query.criteria[column]) {
						query.criteria[column] = []; // sharding heuristics
					}
				}
				var list = [];
				for (j=0, vl=value.length; j<vl; ++j) {
					if (!Q.isArrayLike(value[j])) {
						var json = JSON.stringify(value[j]);
						throw new Q.Exception(
							"Db.Query.Mysql: Value " + json
								+ " needs to be an array"
						);
					}
					if (value[j].length != pl) {
						throw new Q.Exception(
							"Db_Query_Mysql: Arrays should have " + pl +
							" elements to match " + expr
						);
					}
					var vector = [];
					for (k=0, vl2=value[j].length; k<vl2; ++k) {
						vector.push(":_criteria_" + _valueCounter);
						query.parameters["_criteria_" + _valueCounter] = value[j][k];
						_valueCounter = (_valueCounter + 1) % 1000000;
						query.criteria[column].push(value[j][k]); // sharding heuristics
					}
					list.push('(' + vector.join(',') + ')');
				}
				if (list.length) {
					var lhs = '(' + columns.join(',') + ')';
					var rhs = '(\n' + list.join(',\n') + '\n)';
					criteria_list.push(lhs + ' IN ' + rhs);
				} else {
					criteria_list.push('FALSE');
				}
			} else if (value === undefined) {
				// do not add this value to criteria
			} else if (value == null) {
				criteria_list.push( "ISNULL(" + expr + ")");
			} else if (value && value.typename === "Db.Expression") {
				Q.extend(query.parameters, value.parameters);
				if (/\W/.test(expr.substr(-1))) {
					criteria_list.push( "" + expr + "(" + value + ")" );
				} else {
					criteria_list.push( "" + expr + " = (" + value + ")");
				}
			} else if (Q.isArrayLike(value)) {
				var valueList = '';
				if (value.length) {
					values = [];
					for (i=0; i<value.length; ++i) {
						values.push(":_criteria_" + _valueCounter);
						query.parameters["_criteria_" + _valueCounter] = value[i];
						_valueCounter = (_valueCounter + 1) % 1000000;
					}
					valueList = values.join(',');
				}
				if (/\W/.test(expr.substr(-1))) {
					criteria_list.push( "" + expr + "(" + valueList + ")" );
				} else if (value.length === 0) {
					criteria_list.push("FALSE"); // since value array is empty
				} else {
					criteria_list.push( "" + expr + " IN (" + valueList + ")");
				}
			} else if (value && value.typename === 'Db.Range') {
				if (value.min != null) {
					var c_min = value.includeMin ? ' >= ' : ' > ';
					criteria_list.push( "" + expr + c_min + ":_criteria_" + _valueCounter );
					query.parameters["_criteria_" + _valueCounter] = value.min;
					_valueCounter = (_valueCounter + 1) % 1000000;
				}
				if (value.max != null) {
					var c_max = value.includeMax ? ' <= ' : ' < ';
					criteria_list.push( "" + expr + c_max + ":_criteria_" + _valueCounter );
					query.parameters["_criteria_" + _valueCounter] = value.max;
					_valueCounter = (_valueCounter + 1) % 1000000;
				}
			} else {
				var eq = /\W/.test(expr.substr(-1)) ? '' : ' = ';
				criteria_list.push( "" + expr + eq + ":_criteria_" + _valueCounter );
				query.parameters["_criteria_" + _valueCounter] = value;
				_valueCounter = (_valueCounter + 1) % 1000000;
			}
		}
		criteria = criteria_list.join(" AND ");
	} else if (criteria && criteria.typename === "Db.Expression") {
		Q.extend(query.parameters, criteria.parameters);
		criteria = criteria.toString();
	}

	return criteria;
}

function set_internal (query, updates) {
	switch (query.type) {
		case Db.Query.TYPE_UPDATE:
			break;
		default:
			throw new Q.Exception("Db.Query.Mysql set_internal: The SET clause does not belong in this context.");
	}
	if (typeof updates === 'object') {
		var updates_list = [];
		for (var field in updates) {
			var value = updates[field];
			if (value && value.typename === "Db.Expression") {
				Q.extend(query.parameters, value.parameters);
				updates_list.push(field + " = " + value);
			} else {
				updates_list.push(field + " = :_set_"+_valueCounter);
				query.parameters["_set_"+_valueCounter] = value;
				_valueCounter = (_valueCounter + 1) % 1000000;
			}
		}
		updates = (updates_list.length) ? updates_list.join(", \n") : "";
	}
	if (typeof updates !== 'string') {
		throw new Q.Exception("Db.Query.Mysql set_internal: The SET updates need to be specified correctly.");
	}
	return updates;
}

Q.mixin(Query_Mysql, Db.Query);

module.exports = Query_Mysql;