<?php
/**
* @module Db
*/
interface Db_Query_Interface
{
/**
* Interface that an adapter must support
* to implement the Db class.
* @class Db_Query_Interface
* @constructor
* @param {Db_Interface} $db The database connection
* @param {integer} $type The type of the query. See class constants beginning with TYPE_ .
* @param {array} $clauses The clauses to add to the query right away
* @param {array} $parameters The parameters to add to the query right away (to be bound when executing)
*/
//function __construct (
// Db_Interface $db,
// $type,
// array $clauses = array(),
// array $parameters = array())
/**
* Builds the query from the clauses
* @method build
*/
function build ();
/**
* Just builds the query and returns the string that would
* be sent to $pdo->prepare().
* If this results in an exception, the string will contain
* the exception instead.
* @method __toString
*/
function __toString ();
/**
* Gets the SQL that would be executed with the execute() method.
* @method getSQL
* @param {callable} [$callback=null] If not set, this function returns the generated SQL string.
* If it is set, this function calls $callback, passing it the SQL
* string, and then returns $this, for chainable interface.
* @return {string|Db_Query} Depends on whether $callback is set or not.
*/
function getSQL ($callback = null);
/**
* Merges additional replacements over the default replacement array,
* which is currently just
* @example
* array (
* '{$prefix}' => $conn['prefix']
* )
*
* The replacements array is used to replace strings in the SQL
* before using it. Watch out, because it may replace more than you want!
* @method replace
* @param {array} [$replacements=array()] This must be an array.
*/
function replace(array $replacements = array());
/**
* You can bind more parameters to the query manually using this method.
* These parameters are bound in the order they are passed to the query.
* Here is an example:
* @example
* $result = $db->select('*', 'foo')
* ->where(array('a' => $a))
* ->andWhere('a = :moo')
* ->bind(array('moo' => $moo))
* ->execute();
*
* @method bind
* @param {array} [$parameters=array()] An associative array of parameters. The query should contain :name,
* where :name is a placeholder for the parameter under the key "name".
* The parameters will be properly escaped.
* You can also have the query contain question marks (the binding is
* done using PDO), but then the order of the parameters matters.
* @chainable
*/
function bind(array $parameters = array());
/**
* Executes a query against the database and returns the result set.
* @method excecute
* @param {boolean} [$prepare_statement=false] Defaults to false. If true, a PDO statement will be prepared
* from the query before it is executed. It is also saved for
* future invocations to use.
* Do this only if the statement will be executed many times with
* different parameters. Basically you would use "->bind(...)" between
* invocations of "->execute()".
* @param {array|string} [$shards] You can pass a shard name here, or an array
* where the keys are shard names and the values are the query to execute.
* This will bypass the usual sharding algorithm.
* @return {Db_Result}
* The Db_Result object containing the PDO statement that resulted
* from the query.
*/
function execute ($prepare_statement = false, $shards = null);
/**
* Begins a transaction right before executing this query.
* The reason this method is part of the query class is because
* you often need the "where" clauses to figure out which database to send it to,
* if sharding is being used.
* @method begin
* @chainable
* @param {string} [$lock_type] The type of lock in the transaction
*/
function begin($lock_type = null);
/**
* Rolls back a transaction right before executing this query.
* The reason this method is part of the query class is because
* you often need the "where" clauses to figure out which database to send it to,
* if sharding is being used.
* @method rollback
* @chainable
*/
function rollback();
/**
* Commits a transaction right after executing this query.
* The reason this method is part of the query class is because
* you often need the "where" clauses to figure out which database to send it to,
* if sharding is being used.
* @method commit
* @chainable
*/
function commit();
/**
* Creates a query to select fields from one or more tables.
* @method select
* @param {string|array} $fields The fields as strings, or array of alias=>field
* @param {string|array} [$tables=''] The tables as strings, or array of alias=>table
* @param {boolean} [$reuse=true] If $tables is an array, and select() has
* already been called with the exact table name and alias
* as one of the tables in that array, then
* this table is not appended to the tables list if
* $reuse is true. Otherwise it is. $reuse is true by default.
* This is really just for using in your hooks.
* @chainable
*/
function select ($fields, $tables = '', $reuse = true);
/**
* Joins another table to use in the query
* @method join
* @param {string} $table The name of the table. May also be "name AS alias".
* @param {Db_Expression|array|string} $condition The condition to join on. Thus, JOIN table ON ($condition)
* @param {string} [$join_type='INNER'] The string to prepend to JOIN, such as 'INNER', 'LEFT OUTER', etc.
* @chainable
*/
function join ($table, $condition, $join_type = 'INNER');
/**
* Adds a WHERE clause to a query
* @method where
* @param {Db_Expression|array} $criteria An associative array of expression => value pairs.
* The values are automatically escaped using PDO placeholders.
* Or, this could be a Db_Expression object.
* @chainable
*/
function where ($criteria);
/**
* Adds to the WHERE clause, like this: "... AND (x OR y OR z)",
* where x, y and z are the arguments to this function.
* @method andWhere
* @param {Db_Expression|string} $criteria
* @param {Db_Expression|string} [$or_criteria=null]
* @chainable
*/
function andWhere ($criteria, $or_criteria = null);
/**
* Adds to the WHERE clause, like this: "... OR (x AND y AND z)",
* where x, y and z are the arguments to this function.
* @method orWhere
* @param {Db_Expression|string} $criteria
* @param {Db_Expression|string} [$and_criteria=null]
* @chainable
*/
function orWhere ($criteria, $and_criteria = null);
/**
* Adds a GROUP BY clause to a query
* @method groupBy
* @param {Db_Expression|string} $expression
* @chainable
*/
function groupBy ($expression);
/**
* Adds a HAVING clause to a query
* @method having
* @param {Db_Expression|array} $criteria An associative array of expression => value pairs.
* The values are automatically escaped using PDO placeholders.
* Or, this could be a Db_Expression object.
* @chainable
*/
function having ($criteria);
/**
* Adds an ORDER BY clause to the query
* @method orderBy
* @param {Db_Expression|string} $expression A string or Db_Expression with the expression to order the results by.
* @param {boolean} [$ascending=true] If false, sorts results as ascending, otherwise descending.
* @chainable
*/
function orderBy ($expression, $ascending = true);
/**
* Adds optional LIMIT and OFFSET clauses to the query
* @method limit
* @param {integer} $limit A non-negative integer showing how many rows to return
* @param {integer} [$offset=null] Optional. A non-negative integer showing what row to start the result set with.
* @chainable
*/
function limit ($limit, $offset = null);
/**
* Adds a SET clause to an UPDATE statement
* @method set
* @param {array} $updates An associative array of column => value pairs.
* The values are automatically escaped using PDO placeholders.
* @chainable
*/
function set (array $updates);
/**
* Fetches an array of database rows matching the query.
* If this exact query has already been executed and
* fetchAll() has been called on the Db_Result, and
* the return value was cached by the Db_Result, then
* that cached value is returned, unless $this->ignoreCache is true.
* Otherwise, the query is executed and fetchAll() is called on the result.
*
* See [PDO documentation](http://us2.php.net/manual/en/pdostatement.fetchall.php)
* @method fetchAll
* @param {enum} $fetch_style=PDO::FETCH_BOTH
* @param {enum} $column_index=null
* @param {array} $ctor_args=null
* @return {array}
*/
function fetchAll(
$fetch_style = PDO::FETCH_BOTH,
$column_index = null,
array $ctor_args = array());
/**
* Fetches an array of Db_Row objects.
* If this exact query has already been executed and
* fetchAll() has been called on the Db_Result, and
* the return value was cached by the Db_Result, then
* that cached value is returned, unless $this->ignoreCache is true.
* Otherwise, the query is executed and fetchDbRows() is called on the result.
* @method fetchDbRows
* @param {string} [$class_name='Db_Row'] The name of the class to instantiate and fill objects from.
* Must extend Db_Row.
* @param {string} [$fields_prefix=''] This is the prefix, if any, to strip out when fetching the rows.
* @return {array}
*/
function fetchDbRows(
$class_name = 'Db_Row',
$fields_prefix = '');
/**
* Adds an ON DUPLICATE KEY UPDATE clause to an INSERT statement.
* Use only with MySQL.
* @method onDuplicateKeyUpdate
* @param {array} $updates An associative array of column => value pairs.
* The values are automatically escaped using PDO placeholders.
* @chainable
*/
function onDuplicateKeyUpdate ($updates);
/**
* This function provides an easy way to provide additional clauses to the query.
* @method options
* @param {array} $options An associative array of key => value pairs, where the key is
* the name of the method to call, and the value is the array of arguments.
* If the value is not an array, it is wrapped in one.
* @chainable
*/
function options ($options);
};
/**
* This class lets you create and use Db queries.
* @class Db_Query
* @extends Db_Expression
*/
abstract class Db_Query extends Db_Expression
{
/*
* Types of queries available right now
*/
/**
* Raw query
* @property TYPE_RAW
* @type integer
* @final
*/
const TYPE_RAW = 1;
/**
* Select query
* @property TYPE_SELECT
* @type integer
* @final
*/
const TYPE_SELECT = 2;
/**
* Insert query
* @property TYPE_INSERT
* @type integer
* @final
*/
const TYPE_INSERT = 3;
/**
* Update query
* @property TYPE_UPDATE
* @type integer
* @final
*/
const TYPE_UPDATE = 4;
/**
* Delete query
* @property TYPE_DELETE
* @type integer
* @final
*/
const TYPE_DELETE = 5;
/**
* Rollback query
* @property TYPE_ROLLBACK
* @type integer
* @final
*/
const TYPE_ROLLBACK = 6;
/**
* Default length of the hash used for sharding
* @property HASH_LEN
* @type integer
* @final
* @default 7
*/
const HASH_LEN = 7;
function copy()
{
// We only have to do a shallow copy of the object,
// because all its properties are arrays, and PHP will copy-on-write
// them when we modify them in the copy.
return clone($this);
}
/**
* This method returns the shard index that is used, if any.
*/
function shardIndex()
{
if (isset($this->cachedShardIndex)) {
return $this->cachedShardIndex;
}
if (!$this->className) {
return null;
}
$conn_name = $this->db->connectionName();
$class_name = substr($this->className, strlen($conn_name)+1);
$info = Q_Config::get('Db', 'upcoming', $conn_name, false);
if (!$info) {
$info = Q_Config::get('Db', 'connections', $conn_name, array());
}
return $this->cachedShardIndex = isset($info['indexes'][$class_name])
? $info['indexes'][$class_name]
: null;
}
/**
* Analyzes the query's criteria and decides where to execute the query.
* Here is sample shards config:
*
* **NOTE:** *"fields" shall be an object with keys as fields names and values containing hash definition
* in the format "type%length" where type is one of 'md5' or 'normalize' and length is hash length
* hash definition can be empty string or false. In such case 'md5%7' is used*
*
* **NOTE:** *"partition" can be an array. In such case shards shall be named after partition points*
*
*
* "Streams": {
* "prefix": "streams_",
* "dsn": "mysql:host=127.0.0.1;dbname=DBNAME",
* "username": "USER",
* "password": "PASSWORD",
* "driver_options": {
* "3": 2
* },
* "shards": {
* "alpha": {
* "prefix": "alpha_",
* "dsn": "mysql:host=127.0.0.1;dbname=SHARDDBNAME",
* "username": "USER",
* "password": "PASSWORD",
* "driver_options": {
* "3": 2
* }
* },
* "betta": {
* "prefix": "betta_",
* "dsn": "mysql:host=127.0.0.1;dbname=SHARDDBNAME",
* "username": "USER",
* "password": "PASSWORD",
* "driver_options": {
* "3": 2
* }
* },
* "gamma": {
* "prefix": "gamma_",
* "dsn": "mysql:host=127.0.0.1;dbname=SHARDDBNAME",
* "username": "USER",
* "password": "PASSWORD",
* "driver_options": {
* "3": 2
* }
* },
* "delta": {
* "prefix": "delta_",
* "dsn": "mysql:host=127.0.0.1;dbname=SHARDDBNAME",
* "username": "USER",
* "password": "PASSWORD",
* "driver_options": {
* "3": 2
* }
* }
* },
* "indexes": {
* "Stream": {
* "fields": {"publisherId": "md5", "name": "normalize"},
* "partition": {
* "0000000. ": "alpha",
* "0000000.sample_": "betta",
* "4000000. ": "gamma",
* "4000000.sample_": "delta",
* "8000000. ": "alpha",
* "8000000.sample_": "betta",
* "c000000. ": "gamma",
* "c000000.sample_": "delta"
* }
* }
* }
* }
*
* @method shard
* @param {array} [$upcoming=null] Temporary config to use in sharding. Used during shard split process only
* @param {array} [$criteria=null] Overrides the sharding criteria for the query. Rarely used unless testing what shards the query would be executed on.
* @return {array} Returns an array of ($shardName => $query) pairs, where $shardName
* can be the name of a shard, '' for just the main shard, or "*" to have the query run on all the shards.
*/
function shard($upcoming = null, $criteria = null)
{
if (isset($criteria)) {
$this->criteria = $criteria;
}
$index = $this->shardIndex();
if (!$index) {
return array("" => $this);
}
if (empty($this->criteria)) {
return array("*" => $this);
}
if (empty($index['fields'])) {
throw new Exception("Db_Query: index for {$this->className} should have at least one field");
}
if (!isset($index['partition'])) {
return array("" => $this);
}
$hashed = array();
$fields = array_keys($index['fields']);
foreach ($fields as $i => $field) {
if (!isset($this->criteria[$field])) {
// not enough information to target the query
return array("*" => $this);
}
$value = $this->criteria[$field];
$hash = !empty($index['fields'][$field]) ? $index['fields'][$field] : 'md5';
$parts = explode('%', $hash);
$hash = $parts[0];
$len = isset($parts[1]) ? $parts[1] : self::HASH_LEN;
if (is_array($value)) {
$arr = array();
foreach ($value as $v) {
$arr[] = self::applyHash($v, $hash, $len);
}
$hashed[$i] = $arr;
} else if ($value instanceof Db_Range) {
if ($hash !== 'normalize') {
throw new Exception("Db_Query: ranges don't work with $hash hash");
}
$hashed_min = self::applyHash($value->min, $hash, $len);
$hashed_max = self::applyHash($value->max, $hash, $len);
$hashed[$i] = new Db_Range(
$hashed_min, $value->includeMin, $value->includeMax, $hashed_max
);
} else {
$hashed[$i] = self::applyHash($value, $hash, $len);
}
}
if (array_keys($index['partition']) === range(0, count($index['partition']) - 1)) {
// $index['partition'] is simple array, name the shards after the partition points
self::$mapping = array_combine($index['partition'], $index['partition']);
} else {
self::$mapping = $index['partition'];
}
return $this->shard_internal($index, $hashed);
}
/**
* Calculate hash of the value
* @method hashed
* @param {string} $value
* @param {string} [$hash=null] Hash is one of 'md5' or 'normalize' optionally followed by '%' and number
* @return {string}
*/
static function hashed($value, $hash = null)
{
$hash = !isset($hash) ? $hash : 'md5';
$parts = explode('%', $hash);
$hash = $parts[0];
$len = isset($parts[1]) ? $parts[1] : self::HASH_LEN;
return self::applyHash($value, $hash, $len);
}
/**
* Returns an array of field names that are "magic" when used
* @return {array}
*/
static function magicFieldNames()
{
return array('insertedTime', 'updatedTime', 'created_time', 'updated_time');
}
/**
* Calculates hash of the value
* @method applyHash
* @private
* @param {string} $value
* @param {string} [$hash='normalize']
* @param {integer} [$len=self::HASH_LEN]
* @return {string}
*/
private static function applyHash($value, $hash = 'normalize', $len = self::HASH_LEN)
{
if (!isset($value)) {
return $value;
}
switch ($hash) {
case 'normalize':
$hashed = substr(Db::normalize($value), 0, $len);
break;
case 'md5':
$hashed = substr(md5($value), 0, $len);
break;
default:
throw new Exception("Db_Query: The hash $hash is not supported");
}
// each hash shall have fixed lenngth. Space is less than any char used in hash so
// let's pad the result to desired length with spaces
return str_pad($hashed, $len, " ", STR_PAD_LEFT);
}
/**
* does a depth first search
* and returns the array of shardname => $query pairs
* corresponding to which shards are affected
* @method shard_internal
* @private
* @param {array} $index
* @param {string} $hashed
* @return {array}
*/
private function shard_internal($index, $hashed)
{
// $index['partition'] shall contain strings "XXXXXX.YYYYYY.ZZZZZZ" where each point has full length of the hash
$partition = array();
$last_point = null;
foreach (array_keys(self::$mapping) as $i => $point) {
$partition[$i] = explode('.', $point);
if (isset($last_point) and strcmp($point, $last_point) <= 0) {
throw new Exception("Db_Query shard_internal: in {$this->className} partition, point $i is not greater than the previous point");
}
$last_point = $point;
}
$keys = array_map(
array($this, "map_shard"),
self::slice_partitions($partition, 0, $hashed)
);
return array_fill_keys($keys, $this);
}
/**
* Narrows the partition list according to hashes
* @method slice_partitions
* @private
* @param {array} $partition
* @param {integer} $j Currently processed hashed array member
* @param {array} $hashed
* @param {boolean} [$adjust=false]
* @return {array}
*/
static private function slice_partitions($partition, $j, $hashed, $adjust = false) {
// if hashed[$field] is a string only one point shall be found
// if hashed[$field] is an array, let's process each array member
// if hashed[$field] is range return all shards from interval min-max
// do this recursively for each field one by one
if (count($partition) <= 1) return $partition;
// this shall be set!
$hj = $hashed[$j];
if (is_array($hj)) {
$result = array();
$temp = $hashed;
foreach ($hj as $h) {
$temp[$j] = $h;
$result = array_merge(
$result,
self::slice_partitions($partition, $j, $temp, $adjust)
);
}
// $result may contain duplicates!
return $result;
}
// $hj is a string or Db_Range
$min = $max = $hj;
$includeMax = true;
if ($hj instanceof Db_Range) {
$min = $hj->min;
$max = $hj->max;
if (!isset($min)) {
throw new Exception("Db_Query_Mysql slice_partitions: The minimum of the range should be set.");
}
//$includeMax = $hj->includeMax;
}
// the first item to keep
$lower = 0;
// the last item to keep
$upper = count($partition)-1;
// we need this if adjusting result for range search
$lower_found = $upper_found = false;
foreach ($partition as $i => $point) {
// $upper_found shall be reset in each block
$upper_found = $upper_found && isset($next);
$current = $point[$j];
// if $current is bigger than $max nothing to check anymore.
// but if we adjust for range, we shall look trough all partition again
// to find upper bound at the end of partition array
if (!$adjust && isset($max) && ($includeMax ? strcmp($current, $max) > 0 : strcmp($current, $max) >= 0)) break;
// we shall wait till $current and $next are different
if (($next = isset($partition[$i+1][$j]) ? $partition[$i+1][$j] : null) === $current) continue;
// when adjusting $next may be less than $current but $lower is already found
if ($adjust && strcmp($current, $next) > 0) $lower_found = !($next = null);
// check lower bound we can skip all $partition up to $next but keep $next
if (!$lower_found && (isset($next) && strcmp($min, $next) >= 0)) $lower = $i+1;
// now check $next. That's the first time when $max < $next so we've found upper bound
if (!$upper_found)
if (!isset($next) || ($includeMax ? strcmp($max, $next) < 0 : strcmp($max, $next) <= 0)) {
// we have found upper bound. We can skip all partitions starting from the $next
$upper = $i;
if (!$adjust) break;
else $upper_found = true;
}
}
// we are not interested in points up to $lower and over $upper
// if $hj is Db_Range - check upper bound
// if we have checked all $hashed - nothing to check anymore,
// otherwise - check the rest of $hashed
if (isset($hashed[$j+1])) {
return self::slice_partitions(
array_slice($partition, $lower, $upper-$lower+1),
$j+1, $hashed, $hj instanceof Db_Range || $adjust
);
} else {
return array_slice($partition, $lower, $upper-$lower+1);
}
}
/**
* Make partition from array of points
* @method map_shard
* @private
* @param {array} $a
* @return {string}
*/
static private function map_shard($a) {
return self::$mapping[implode('.', $a)];
}
/**
* Actual points mapping depending if partition is plain or associative array
* @property $mapping
* @type array
* @private
*/
static private $mapping = null;
/**
* Class cache
* @property $cache
* @type array
*/
static $cache = array();
protected $cachedShardIndex = null;
}