Show:

File: platform/classes/Db/Utils.php

<?php

/**
 * @module Db
 */

/**
 * This class lets you do things related to databases and db results
 * @class Db_Utils
 * @static
 */

class Db_Utils
{

	/**
	 * Sort array by given field
	 * @method sort
	 * @static
	 * @param {&array} $dbRows
	 * @param {string} $field_name
	 */
	static function sort (array & $dbRows, $field_name)
	{
		if (empty($field_name))
			throw new Exception('Must supply field name to compare by');
		self::$compare_field_name = $field_name;
		usort($dbRows, array('Db_Utils', 'compare_dbRows'
		));
	}

	/**
	 * Compare database rows
	 * @method compare_dbRows
	 * @static
	 * @private
	 * @param {array} $dbRow1
	 * @param {array} $dbRow2
	 * @return {integer}
	 */
	static private function compare_dbRows ($dbRow1, $dbRow2)
	{
		$compare_field_name = self::$compare_field_name;
		if ($dbRow1->$compare_field_name > $dbRow2->$compare_field_name)
			return 1; else if ($dbRow1->$compare_field_name == $dbRow2->$compare_field_name)
			return 0; else
			return - 1;
	}

	/**
	 * A very useful function for stripping off only the 
	 * parameters you need from an array. 
	 * Use it for passing parameters to functions in a flexible way.
	 * @method take
	 * @static
	 * @param {array} $from The associative array from which to take the parameters from, 
	 *  consisting of param => value. For example the $_REQUEST array.
	 * @param {array} $parameters An associative array of paramName => defaultValue pairs.
	 *  If the parameter was not found in the $from array, 
	 *  the default value is used.
	 * @param {string} [$prefix=''] If non-empty, then parameter names are
	 * prepended with this prefix before searching in $from is done.
	 * The prefix is stripped out in the resulting array.
	 * Typically used for database rows. 
	 * If $parameters is empty, ALL items in $from with 
	 * keys starting with $prefix are returned.
	 * @return {array} The parameters are stripped off from the $from array, 
	 * according to the above rules, and returned as an array.
	 */
	static function take (array $from, array $parameters, $prefix = '')
	{
		$result = array();
		if (count($parameters) > 0) {
			// There are parameters to strip off. Observe the prefix, too, if any.
			foreach ($parameters as $key => $value) {
				if (array_key_exists($prefix . $key, $from))
					$result[$key] = $from[$prefix . $key]; else {
					$default = $parameters[$key];
					if ($default instanceof Exception)
						throw $default;
					$result[$key] = $default;
				}
			}
		} else if ($prefix > '') {
			// Parameters aren't specified, but a prefix is.
			$prefixlen = strlen($prefix);
			foreach ($from as $key => $value)
				if (strncmp($key, $prefix, $prefixlen) == 0)
					$result[substr($key, $prefixlen)] = $from[$key];
		} else {
			$result = $from;
		}
		return $result;
	}

	/**
	 * Append a message to the log
	 * @method log
	 * @static
	 * @param {string} $message the message to append
	 * @param {integer} [$level=LOG_NOTICE] see E_NOTICE in the php manual, etc.
	 * @param {boolean} [$timestamp=true] whether to prepend the current timestamp
	 * @param {string} [$ident='Db:'] the ident string to prepend to the message
	 */
	static function log ($message, $level = LOG_NOTICE, $timestamp = true, $ident = 'Db: ')
	{
		static $logOpen = false;
		if (! $logOpen)
			openlog($ident, LOG_NDELAY | LOG_PID | LOG_PERROR, LOG_USER);
		$logOpen = true;
		syslog($level, 
			($timestamp ? date('Y-m-d H:i:s') . ' ' : '') . $message);
	}

	/**
	 * Combines a dirname and a basename, using a slash if needed.
	 * Use this function to build up paths with the correct DIRECTORY_SEPARATOR.
	 * @method filename
	 * @static
	 * @param {string} $dirname The part of the the filename to append to.
	 *  May or may not include a slash at the end.
	 * @param {string} [$basename=null] The part of the filename that comes after the slash
	 *  You can continue to pass more tools as the 3rd, 4th etc.
	 *  parameters to this function, and they will all be
	 *  concatenated into one filename.
	 * @return {string} The combined absolute filename. If it does not exist,
	 *  but the filename appended to the current working directory
	 *  exists, then the latter is returned.
	 */
	static function filename (
		$dirname, $basename = null, $basename2 = null)
	{
		$args = func_get_args();
		$pieces = array();
		$count = count($args);
		for ($i = 0; $i < $count - 1; ++ $i) {
			$pieces[] = (substr($args[$i], - 1) == '/' 
			or substr($args[$i], -1) == "\\"
			or substr($args[$i], -1) == DS) 
				? substr($args[$i], 0, - 1) 
				: $args[$i];
		}
		$pieces[] = $args[$count - 1];
		$filename = implode(DS, $pieces);
		if (!file_exists($filename)) {
			// In this case, try the current working directory
			$cwd = getcwd();
			if ($filename[0] != DS and substr($cwd, -1) != DS)
				$filename_try = $cwd . DS . $filename;
			else
				$filename_try = $cwd . $filename;
			$filename_realpath = realpath($filename_try);
			if ($filename_realpath)
				return $filename_realpath;
		}
		return $filename;
	}

	/**
	 * Exports a simple variable into something that looks nice, nothing fancy (for now)
	 * Does not preserve order of array keys.
	 * @method var_export
	 * @static
	 * @param {mixed&} $var the variable to export
	 */
	static function var_export (&$var)
	{
		if (is_string($var)) {
			$var_2 = addslashes($var);
			return "'$var_2'";
		} elseif (is_array($var)) {
			$indexed_values_quoted = array();
			$keyed_values_quoted = array();
			foreach ($var as $key => $value) {
				$value = self::var_export($value);
				if (is_string($key))
					$keyed_values_quoted[] = "'" . addslashes($key) . "' => $value"; else
					$indexed_values_quoted[] = $value;
			}
			$parts = array();
			if (! empty($indexed_values_quoted))
				$parts['indexed'] = implode(', ', $indexed_values_quoted);
			if (! empty($keyed_values_quoted))
				$parts['keyed'] = implode(', ', $keyed_values_quoted);
			$exported = 'array(' . implode(", \n", $parts) . ')';
			return $exported;
		} else {
			return var_export($var, true);
		}
	}

	/**
	 * Saves a text file. Need to enable UTF-8 support here.
	 * @method saveTextFile
	 * @static
	 * @param {string} $filename The name of the file to save to. Can be relative to this file, or full.
	 * @param {string} $contents  The text string to save
	 * @return {integer} The number of bytes saved, or false if not saved
	 */
	static function saveTextFile ($filename, $contents)
	{
		$dir = dirname($filename);
		if (!file_exists($dir)) {
			mkdir($dir, 0755, true);
		}
		if (!is_dir($dir)) {
			return false;
		}
		// TODO: implement semaphore based on filename to eliminate race conditions
		$result = @file_put_contents($filename, $contents, LOCK_EX); 
		// TODO: use FILE_TEXT for UTF-8 in PHP6
		return $result;
	}
	
	/**
	 * Dumps as a table
	 * @method dump_table
	 * @static
	 * @param {array} $rows
	 */
	static function dump_table ($rows)
	{
		$first_row = true;
		$keys = array();
		$lengths = array();
		foreach ($rows as $row) {
			foreach ($row as $key => $value) {
				if ($first_row) {
					$keys[] = $key;
					$lengths[$key] = strlen($key);
				}
				$val_len = strlen((string)$value);
				if ($val_len > $lengths[$key])
					$lengths[$key] = $val_len;
			}
			$first_row = false;
		}
		foreach ($keys as $i => $key) {
			$key_len = strlen($key);
			if ($key_len < $lengths[$key]) {
				$keys[$i] .= str_repeat(' ', $lengths[$key] - $key_len);
			}
		}
		echo PHP_EOL;
		echo implode("\t", $keys);
		echo PHP_EOL;
		foreach ($rows as $i => $row) {
			foreach ($row as $key => $value) {
				$val_len = strlen((string)$value);
				if ($val_len < $lengths[$key]) {
					$row[$key] .= str_repeat(' ', $lengths[$key] - $val_len);
				}
			}
			echo implode("\t", $row);
			echo PHP_EOL;
		}
	}

	/**
	 * Split shard partitin to new shards. It takes the full table or single partition
	 * and split it to multiple partitions according to the number of provided 'parts'.
	 * When doing initioa split 'shard' may be ommited however 'fields' shall be provided
	 * @method split
	 * @static
	 * @param {Q_Tree} $config Contains all necessary information for split procedure in the following format:
	 * @example
	 *	{
	 *		"plugin": "PLUGINNAME", // the name of plugin - shall be used by app
	 *		"connection": "CONNECTIONNAME", // connection - shall be registered with plugin
	 *		"table": "TABLENAME", // the table to shard
	 *		"class": "CLASSNAME", // the class which is stored in the table
	 *		"fields": {"FIELDNAME": "HASH", "FIELDNAME": "HASH", ...}, // Optional. Used only when starting sharding
	 *			"shard": "SHARDNAME" // Optionsl. The shard to split. If no shards defined or SHARDNAME does not exist the script will fail
	 *			// "parts" can be either array of connections or object {"SHARDNAME": connection, ...}
	 *		"parts": {
	 *			"SHARDNAME": {
	 *				"prefix": "PREFIX",
	 *				"dsn": "DSN",
	 *					...
	 *			},
	 *			"SHARDNAME": {
	 *				"prefix": "PREFIX",
	 *				"dsn": "DSN",
	 *					"username": "USERNAME",
	 *					"password": "PASSWORD",
	 *					"driver_options": {
	 *						"3": 2
	 *					}
	 *			},
	 *			...
	 *		}
	 *	}
	 *
	 * @return {boolean} Weather php part of the process completed successfuly
	 */

	static function split ($config) {
		// all input data shall be provided
		// for future extension plugin/connection/table/class are considered unrelated
		if (!($plugin = $config->get('plugin', false))) {
			echo "Plugin name is not defined\n";
			return false;
		}
		// plugin shall be registered!
		if (!Q_Config::get('Q', 'pluginInfo', $plugin, false)) {
			echo "Plugin '$plugin' is not registered in the platform\n";
			return false;
		}
		if (!($connection = $config->get('connection', false))) {
			echo "Connection '$connection' is not defined\n";
			return false;
		}
		// connection shall exist and be registered with plugin!
		if (!Q_Config::get('Db', 'connections', $connection, false)) {
			echo "Connection '$connection' does not exist\n";
			return false;
		}
		if (!in_array($connection, Q_Config::get('Q', 'pluginInfo', $plugin, 'connections', array()))) {
			echo "Connection '$connection' is not registered for plugin '$plugin'\n";
			return false;
		}
		if (!($class = $config->get('class', false))) {
			echo "Class name is not defined\n";
			return false;
		}
		if (!($table = $config->get('table', false))) {
			echo "Table name is not defined\n";
			return false;
		}
		if (!($shard = $config->get('shard', false)) && Q_Config::get('Db', 'connections', $connection, 'shards', false)) {
			echo "Shard to partition is not defined\n";
			return false;
		}
		if (!($parts = $config->get('parts', false))) {
			echo "New parts are not defined\n";
			return false;
		}
		if ($node = $config->get('node', null)) {
			$nodeInternal = Q_Config::expect('Q', 'nodeInternal');
			$node = array("http://{$nodeInternal['host']}:{$nodeInternal['port']}/Q_Utils/query", $node);
		}

		// now we shall distinguish if table is already sharded or not
		if ($shard === false) {
			if (!($fields = $config->get('fields', false))) {
				echo "To start sharding you shall define 'fields' parameter\n";
				return false;
			}
		}

		// weather provided split config is mapped or not
		$split_mapped = (array_keys($parts) !== range(0, count($parts) - 1));

		// set up config for shards if it does not exist yet
		if ($shard === false) {
			$partition = array();
			foreach ($fields as $name => $hash) {
				if (empty($hash)) $hash = 'md5';
				$part = explode('%', $hash);
				$hash = $part[0];
				$len = isset($part[1]) ? $part[1] : Db_Query::HASH_LEN;
				// "0" has the lowest ascii code for both md5 and normalize
				//	$partition[] = $hash === 'md5' ? str_pad('', $len, "0", STR_PAD_LEFT) : str_pad('', $len, " ", STR_PAD_LEFT);
				$partition[] = str_pad('', $len, "0", STR_PAD_LEFT);
				
			}
			$shard = join('.', $partition);
			if (Q_Config::get('Db', 'connections', $connection, 'indexes', $table, false)) {
				echo "Shards are not defined but indexes for table '$table' are defined in local config\n";
				return false;
			}
			// Let's merge in dummy shards section - shard with name '' is handled as single table
			Q_Config::merge(array(
				'Db' => array(
					'connections' => array(
						$connection => array(
							"shards" => array(),
							"indexes" => array(
								$table => array(
									"fields" => $fields,
									"partition" => $split_mapped 
										? array($shard => '')
										: array($shard)
								)
							)
						)
					)
				)
			));
			$shard_name = '';
		}

		// get partition information
		if (!($partition = Q_Config::get('Db', 'connections', $connection, 'indexes', $table, 'partition', false))) {
			echo "Upps, cannot get shards partitioning\n";
			return false;
		}

		// weather main config is mapped or not
		// also $points contains the partitioning array without mapping
		$points = ($mapped = (array_keys($partition) !== range(0, count($partition) - 1))) 
			? array_keys($partition) 
			: $partition;

		$i = array_search($shard, $points);
		$next = isset($points[++$i]) ? $points[$i] : null;
		$fields = Q_Config::expect('Db', 'connections', $connection, 'indexes', $table, 'fields');
		// now $shard and $next contain boundaries for data to split
		// $points contain partitioning array without mapping - array
		// $parts contains split parts (shards) definition - array or object ($split_mapped)
		// $partition contains current partitioning - array or object ($mapped)
		// $fields contains field names and hashes

		// time to calculate new split point(s)
		if (!isset($shard_name))
			$shard_name = $mapped ? $partition[$shard] : $shard;
		$shard_db = $class::db();
		$pdo = $shard_db->reallyConnect($shard_name);
		$shard_table = $class::table();
		$shard_table = str_replace('{$dbname}', $shard_db->dbname, $shard_table);
		$shard_table = str_replace('{$prefix}', $shard_db->prefix, $shard_table);

		// verify if current shard is updated to latest version
		$current_version = $shard_db->select('version', "{$shard_db->prefix}Q_plugin")
								->where(array("plugin" => $plugin))
								->fetchAll(PDO::FETCH_ASSOC);
		if (!empty($current_version)) {
			$current_version = $current_version[0]['version'];
			$version = Q_Config::get('Q', "pluginInfo", $plugin, 'version', null);
			if (Q::compareVersion($current_version, $version) < 0) {
				echo "Please, update plugin '$plugin' to version '$version' (currently $current_version)\n";
				return false;
			}
		} else {
			echo "Cannot get installed version of plugin '$plugin'\n";
			return false;
		}

		// We'll limit search with shard boundaries using latin1 string comparison
		$lower = join(explode('.', $shard));
		$upper = isset($next) ? join(explode('.', $next)) : null;
		$normalize = false;

		$where = $group = $order = array();
		foreach(array_keys($fields) as $i => $field) {
			$hash = !empty($fields[$field]) ? $fields[$field] : 'md5';
			$part = explode('%', $hash);
			$normalize = ($normalize || ($hash = strtoupper($part[0])) === 'NORMALIZE');
			$len = isset($part[1]) ? $part[1] : Db_Query::HASH_LEN;
			$group[] = $field;
			$order[] = "CAST($hash($field) AS CHAR($len))";
		}

		// if any field uses 'normalize' hash
		// the original shard shall have MySQL NORMALIZE() function defined
		// MySQL version of NORMALIZE handles only 255 chars and does not add md5 hash
		// (see Db_Utils::normalize)
		if ($normalize) {
			try {
				$pdo->exec("DROP FUNCTION IF EXISTS NORMALIZE;");
				$pdo->exec("CREATE FUNCTION NORMALIZE(s CHAR(255))
						RETURNS CHAR(255) DETERMINISTIC
						BEGIN
					    	DECLARE res CHAR(255) DEFAULT '';
					  		DECLARE t CHAR(1);
					    	WHILE LENGTH(s) > 0 DO
					        	SET t = LOWER(LEFT(s, 1));
					    	    SET s = SUBSTRING(s FROM 2);
					        	IF t REGEXP '[^A-Za-z0-9]' THEN
					            	SET t = '_';
					        	END IF;
					        	SET res = CONCAT(res, t);
					    	END WHILE;
					    	RETURN res;
						END"
					);
			} catch (Exception $e) {
				//echo "ERROR: {$e->getMessage()}\n";
				echo "Please, make sure that db user for shard '$shard_name' has 'CREATE ROUTINE' permission\n";
				return false;
			}
		}

		$order = join(', ', $order);
		$group = join(', ', $group);
		$where = "(STRCMP(CONCAT($order), '$lower') >= 0)"
			.(isset($upper) ? " AND (STRCMP(CONCAT($order), '$upper') < 0)" : "");

		$count = reset($pdo
						->query("SELECT COUNT(*) FROM $shard_table WHERE $where")
						->fetchAll(PDO::FETCH_NUM));

		if (empty($count)) {
			echo "Failed to connect to shard '$shard_name'\n";
			return false;
		}

		$count = reset($count);

		if ($count == 0) {
			echo "Cannot split empty shard!\n";
			return false;
		}

		// if only one new shard provided script will copy data and cnange config
		if (($num_shards = count($parts)) < 1) {
			echo "Please, provide at least one new shard";
			return false;
		}

		$break = round($count/$num_shards);
		// if split config is not mapped and current config is mapped we shall convert split
		//  config to mapped
		$new_partition = ($mapped  || $split_mapped
			? array($shard => ($split_mapped
						? reset(array_keys($parts))
						: $shard))
			: array($shard));
		$new_shards = array($split_mapped ? reset(array_keys($parts)) : $shard => reset($parts));

		$i = 0;
		foreach (array_slice($parts, 1) as $name => $dsn) {
			$offset = $break*(++$i);
			$split = reset($pdo->query("SELECT $group FROM $shard_table WHERE $where ORDER BY $order LIMIT $offset, 1")->fetchAll(PDO::FETCH_ASSOC));
			foreach ($fields as $field => $hash)
				$split[$field] = Db_Query::hashed($split[$field], $hash);
			$split = join('.', $split);
			if ($mapped || $split_mapped) $new_partition[$split] = ($split_mapped ? $name : $split);
			else $new_partition[] = $split;
			$new_shards[$new_name = ($split_mapped ? $name : $split)] = $dsn;
			if (Q_Config::get('Db', 'connections', $connection, 'shards', $new_name, false))
				echo "WARNING!!! Shard already exists: '$new_name'\n";
		}

		Q_Config::merge(array(
			'Db' => array(
				'connections' => array(
					$connection => array(
						"shards" => $new_shards
					)
				)
			)
		));

		// if split config is mapped and current config is not we shall convert app config to mapped
		if ($split_mapped && !$mapped) {
			$partition = array();
			foreach ($points as $point) {
				$partition[$point] = $point;
			}
			Q_Config::set('Db', 'connections', $connection, 'indexes', $table, 'partition', $partition);
			$mapped = true;
		};

		// TODO: verify if new shards sizes are approx. equal

		// Verify versions of existing shards and
		// Install pligin schema to new shards
		Q_Plugin::installSchema(Q_PLUGINS_DIR.DS.$plugin, $plugin, 'plugin', $connection, array('sql' => array($connection => array('enabled' => true))));

		// make sure 'upcoming' config is loaded
		$configFiles = Q_Config::get('Q', 'configFiles', array());

		// 'local/Q/bootstrap.json' should be loaded already but we'll better check
		if (!in_array('Q/config/bootstrap.json', $configFiles)) {
			echo "Config file 'Q/config/bootstrap.json' shall be loaded via 'Q/configFiles key'\non every shard - check 'platform/config/Q.json'\n";
			return false;
		}

		$upcoming_file = Q_Config::get('Q', 'internal', 'sharding', 'upcoming', 'Db/config/upcoming.json');
		//if (!unlink ($upcoming_file)) {
		//	echo "Please, manually remove file '$upcoming_file' and start this script again.\n";
		//	return false;
		//}

		if (!in_array($upcoming_file, $configFiles)) {
			// add upcoming.json to config
			if (!Q_Config::setOnServer(
				'Q/config/bootstrap.json',
				array(
					'Q' => array(
						'configFiles' => array(
							$upcoming_file
						)
					)
				))) {
				echo "Failed to update 'local/Q/bootstrap.json'\n";
				return false;
			}
		}

		// Now after some short time all workers (php and node) will be ready for splitting
		// We'll let node server to wait necessary amount of time.
		$res = Q_Utils::queryInternal('Db/Shards', array(
				'Q/method' => 'split',
				'shard' => $shard_name,
				'shards' => Q::json_encode($new_shards),
				'part' => $shard,
				'table' => $table,
				'dbTable' => $shard_table,
				'class' => $class,
				'plugin' => $plugin,
				'connection' => $connection,
				'where' => $where,
				'parts' => Q::json_encode(array('partition' => $new_partition, 'fields' => $fields))
			), $node);
		if ($res) {
			echo "Split process for shard '$shard_name' ($shard) has started\nPlease, monitor node.js console for important messages and process status\n";
			return true;
		};
		echo "Failed to start split process at node server\n";
		return false;
	}
	
	/**
	 * Attempts to recover interrupted shards split process
	 * @method splitRecover
	 * @static
	 */
	static function splitRecover () {
		if (Q_Config::get('Db', 'upcoming', false) && ($node = Q_Config::get('Db', 'internal', 'sharding', 'logServer', false))) {
			if (Q_Utils::queryInternal('Db/Shards', array('Q/method' => 'reset'), $node)) {
				echo "Split process was reset successfuly\n";
				return true;
			}
		}
		echo "Please, remove 'Db/config/upcoming.json', verify config, drop new shards and start split process again\n";
		return false;
	}
	static $compare_field_name;
}