<?php
/**
* Streams model
* @module Streams
* @main Streams
*/
/**
* Static methods for the Streams models.
* @class Streams
* @extends Base_Streams
* @abstract
*/
abstract class Streams extends Base_Streams
{
/*
* This is where you would place all the static methods for the models,
* the ones that don't strongly pertain to a particular row or table.
*/
/**
* Read levels
* @property $READ_LEVEL
* @type array
*/
/**
* Can't see the stream
* @property $READ_LEVEL['none']
* @type integer
* @default 0
* @final
*/
/**
* Can see icon and title
* @property $READ_LEVEL['see']
* @type integer
* @default 10
* @final
*/
/**
* Can see the stream's content
* @property $READ_LEVEL['content']
* @type integer
* @default 20
* @final
*/
/**
* Can see relations to other streams
* @property $READ_LEVEL['relations']
* @type integer
* @default 25
* @final
*/
/**
* Can see participants in the stream
* @property $READ_LEVEL['participants']
* @type integer
* @default 30
* @final
*/
/**
* Can play stream in a player
* @property $READ_LEVEL['messages']
* @type integer
* @default 40
* @final
*/
/**
* Max read level
* @property $READ_LEVEL['max']
* @type integer
* @default 40
* @final
*/
public static $READ_LEVEL = array(
'none' => 0, // can't see the stream
'see' => 10, // can see icon and title
'content' => 20, // can see the stream's content
'relations' => 25, // can see relations to other streams
'participants' => 30, // can see participants in the stream
'messages' => 40, // can play stream in a player
'max' => 40
);
/**
* Write levels
* @property $WRITE_LEVEL
* @type array
*/
/**
* Cannot affect stream or participants list
* @property $WRITE_LEVEL['none']
* @type integer
* @default 0
* @final
*/
/**
* Can become a participant, chat, and leave
* @property $WRITE_LEVEL['join']
* @type integer
* @default 10
* @final
*/
/**
* Can vote for a relation message posted to the stream.
* @property WRITE_LEVEL['vote']
* @type integer
* @default 13
* @final
*/
/**
* Can post messages, but manager must approve
* @property $WRITE_LEVEL['postPending']
* @type integer
* @default 15
* @final
*/
/**
* Can post messages which appear immediately
* @property $WRITE_LEVEL['post']
* @type integer
* @default 20
* @final
*/
/**
* Can post messages relating other streams to this one
* @property WRITE_LEVEL['relate']
* @type integer
* @default 23
* @final
*/
/**
* Can update properties of relations directly
* @property WRITE_LEVEL['relations']
* @type integer
* @default 25
* @final
*/
/**
* Can post messages requesting edits of stream
* @property $WRITE_LEVEL['suggest']
* @type integer
* @default 28
* @final
*/
/**
* Can post messages to edit stream content immediately
* @property $WRITE_LEVEL['edit']
* @type integer
* @default 30
* @final
*/
/**
* Can post a message requesting to close the stream
* @property $WRITE_LEVEL['closePending']
* @type integer
* @default 35
* @final
*/
/**
* Don't delete, just prevent any new changes to stream
* however, joining and leaving is still ok
* @property $WRITE_LEVEL['close']
* @type integer
* @default 40
* @final
*/
/**
* Max write level
* @property $WRITE_LEVEL['max']
* @type integer
* @default 40
* @final
*/
public static $WRITE_LEVEL = array(
'none' => 0,
'join' => 10,
'vote' => 13,
'postPending' => 18,
'post' => 20,
'relate' => 23,
'relations' => 25,
'suggest' => 28,
'edit' => 30,
'closePending' => 35,
'close' => 40,
'max' => 40
);
/**
* Admin levels
* @property $ADMIN_LEVEL
* @type array
*/
/**
* Cannot do anything related to admin / users
* @property $ADMIN_LEVEL['none']
* @type integer
* @default 0
* @final
*/
/**
* Can prove things about the stream's content or participants
* @property $ADMIN_LEVEL['tell']
* @type integer
* @default 10
* @final
*/
/**
* Able to create invitations for others, granting access
* and permissions up to what they themselves have
* @property $ADMIN_LEVEL['invite']
* @type integer
* @default 20
* @final
*/
/**
* Can approve posts, and give people any adminLevel < 'manage'
* @property $ADMIN_LEVEL['manage']
* @type integer
* @default 30
* @final
*/
/**
* Can give people any adminLevel <= 'own'
* @property $ADMIN_LEVEL['own']
* @type integer
* @default 40
* @final
*/
/**
* Max admin level
* @property $ADMIN_LEVEL['max']
* @type integer
* @default 40
* @final
*/
public static $ADMIN_LEVEL = array(
'none' => 0,
'tell' => 10,
'invite' => 20,
'manage' => 30,
'own' => 40,
'max' => 40
);
/**
* Access sources
* @property $ACCESS_SOURCES
* @type array
*/
/**
* Public access
* @property $ACCESS_SOURCES['public']
* @type integer
* @default 0
* @final
*/
/**
* From contact
* @property $ACCESS_SOURCES['contact']
* @type integer
* @default 1
* @final
*/
/**
* Direct access
* @property $ACCESS_SOURCES['direct']
* @type integer
* @default 2
* @final
*/
/**
* Inherited public access
* @property $ACCESS_SOURCES['inherited_public']
* @type integer
* @default 3
* @final
*/
/**
* Inherited from contact
* @property $ACCESS_SOURCES['inherited_contact']
* @type integer
* @default 4
* @final
*/
/**
* Inherited direct access
* @property $ACCESS_SOURCES['inherited_direct']
* @type integer
* @default 5
* @final
*/
public static $ACCESS_SOURCES = array(
'public' => 0,
'contact' => 1,
'direct' => 2,
'inherited_public' => 3,
'inherited_contact' => 4,
'inherited_direct' => 5
);
/**
* Fetches streams from the database.
* @method fetch
* @static
* @param {string} $asUserId
* Set this to the user for which you are fetching the streams.
* If this matches the publisherId, just returns the streams.
* If this is '', only returns the streams anybody can see.
* Otherwise, return the streams joined with the calculated access settings.
* If you pass null here, then either the logged-in user's id or '' will be used.
* @param {string} $publisherId
* The id of the user publishing these streams
* @param {string|array|Db_Range} $name
* The name of the stream to fetch. Can end in "/" for template streams.
* Also it can be an array of stream names, or a custom Db_Range for stream names
* @param {string} [$fields='*']
* Comma delimited list of fields to retrieve in the stream.
* Must include at least "publisherId" and "name".
* since make up the primary key of the stream table.
* @param {array} [$options=array()]
* Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
* See Db_Query_Mysql::options().
* @param {boolean} [$options.refetch] Ignore cache of previous calls to fetch,
* and save a new cache if necessary.
* @param {boolean} [$options.dontCache] Do not cache the results of
* fetching the streams
* @param {boolean} [$options.withParticipant=false]
* Additionally call ->set('participant', $p) on the stream objects,
* with the participant object corresponding to $asUserId, if any.
* @param {array} [$options.withMessageTotals]
* Pass an array of ($streamName => $messageTypes) here
* to additionally call ->set('messageTotals', $t) on the stream objects.
* @param {array} [$options.withRelatedToTotals]
* pass array('withRelatedFromTotals' => array($streamName => true)) for all rows
* pass array('withRelatedFromTotals' => array($streamName => array($relationType$, ...))) for particular rows
* to additionally call ->set('relatedToTotals', $t) on the stream objects.
* @param {array} [$options.withRelatedFromTotals]
* pass array('withRelatedFromTotals' => array($streamName => true)) for all rows
* pass array('withRelatedFromTotals' => array($streamName => array('relationType', ...))) for particular rows
* to additionally call ->set('relatedFromTotals', $t) on the stream objects.
* @return {array}
* Returns an array of Streams_Stream objects with access info calculated
* specifically for $asUserId . Make sure to call the methods
* testReadLevel(), testWriteLevel() and testAdminLevel()
* on these streams before using them on the user's behalf.
*/
static function fetch(
$asUserId,
$publisherId,
$name,
$fields = '*',
$options = array())
{
if (!isset($asUserId)) {
$asUserId = Users::loggedInUser();
if (!$asUserId) $asUserId = "";
}
if ($asUserId instanceof Users_User) {
$asUserId = $asUserId->id;
}
if ($publisherId instanceof Users_User) {
$publisherId = $publisherId->id;
}
if (empty($publisherId) or empty($name)) {
return array();
}
if (is_array($fields)) {
$options = $fields;
$fields = '*';
}
$allCached = array();
if (empty($options['refetch']) and (is_array($name) or is_string($name))) {
$arr = is_array($name) ? $name : array($name);
$namesToFetch = array();
foreach ($arr as $n) {
if (isset(self::$fetch[$asUserId][$publisherId][$n][$fields])) {
$allCached[$n] = self::$fetch[$asUserId][$publisherId][$n][$fields];
} else {
$namesToFetch[] = $n;
}
}
} else {
$namesToFetch = $name;
}
if ($fields === '*') {
$fields = join(',', Streams_Stream::fieldNames());
}
$criteria = array(
'publisherId' => $publisherId,
'name' => $namesToFetch
);
// Get streams and set their default access info
$allRetrieved = $namesToFetch
? Streams_Stream::select($fields)
->where($criteria)
->ignoreCache()
->options($options)
->fetchDbRows(null, '', 'name')
: array();
if (!empty($options['withParticipant']) and $asUserId) {
$prows = Streams_Participant::select()->where(array(
'publisherId' => $publisherId,
'streamName' => $namesToFetch,
'userId' => $asUserId
))->fetchDbRows(null, '', 'streamName');
foreach ($allRetrieved as $s) {
$s->set('participant', Q::ifset($prows, $s->name, null));
}
}
$streams = $allCached ? array_merge($allCached, $allRetrieved) : $allRetrieved;
Streams::calculateAccess($asUserId, $publisherId, $streams, false);
$streams = self::messageTotals($publisherId, $name, $options, $streams);
$streams = self::relatedToTotals($publisherId, $name, $options, $streams);
$streams = self::relatedFromTotals($publisherId, $name, $options, $streams);
if (is_array($name) and count($name) > 1) {
// put the streams back in the same internal PHP array order
// and in the process honor any duplicate names that might have been passed
$temp = $streams;
$streams = array();
foreach ($name as $n) {
$streams[$n] = isset($temp[$n]) ? $temp[$n] : null;
}
}
$types = array();
foreach ($streams as $stream) {
if ($stream) {
$types[$stream->type] = true;
}
}
$types = array_keys($types);
self::afterFetchExtended($publisherId, $streams);
foreach ($types as $type) {
$cached = array();
$retrieved = array();
foreach ($allCached as $n => $s) {
if ($s->type === $type) {
$cached[$n] = $s;
}
}
foreach ($allRetrieved as $n => $s) {
if ($s->type === $type) {
$retrieved[$n] = $s;
}
}
$params = array(
'streams' => &$streams,
'cached' => $cached,
'retrieved' => $retrieved,
'allCached' => $allCached,
'allRetrieved' => $allRetrieved,
'asUserId' => $asUserId,
'publisherId' => $publisherId,
'name' => $name,
'criteria' => $criteria,
'fields' => $fields,
'options' => $options,
'type' => $type
);
/**
* @event Streams/fetch/$streamType {after}
* @param {&array} streams
* @param {string} asUserId
* @param {string} publisherId
* @param {string} name
* @param {array} criteria
* @param {string} fields
* @param {array} options
*/
Q::event("Streams/fetch/$type", $params, 'after', false, $streams);
}
if (!self::$dontCache and empty($options['dontCache'])) {
foreach ($streams as $n => $stream) {
self::$fetch[$asUserId][$publisherId][$n][$fields] = $stream;
}
}
return $streams;
}
/**
* Fetches one stream from the database.
* @method fetchOne
* @static
* @param {string} $asUserId
* Set this to the user for which you are fetching the streams.
* If this matches the publisherId, just returns the streams.
* If this is '', only returns the streams anybody can see.
* Otherwise, return the streams joined with the calculated access settings.
* If you pass null here, then either the logged-in user's id or '' will be used.
* @param {string} $publisherId
* The id of the user publishing these streams
* @param {string|array|Db_Range} $name
* The name of the stream to fetch. Can end in "/" for template streams.
* Also it can be an array of stream names, or a custom Db_Range for stream names
* @param {string|boolean} $fields='*'
* Must include "publisherId" and "name" fields, since they
* make up the primary key of the stream table.
* Pass true here to throw an exception if the stream is missing.
* @param {array} $options=array()
* Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
* See Db_Query_Mysql::options().
* @param {boolean} [$options.refetch] Ignore cache of previous calls to fetch,
* and save a new cache if necessary.
* @param {boolean} [$options.dontCache] Do not cache the results of
* fetching the streams
* @param {boolean} [$options.withParticipant] Additionally call ->set('participant', $p)
* on the stream object, with the participant object corresponding to $asUserId, if any.
* @param {array} [$options.withMessageTotals]
* Pass an array of arrays ($streamName => $messageTypes) here
* to additionally call ->set('messageTotals', $t) on the stream objects.
* @param {array} [$options.withRelatedToTotals]
* pass array('withRelatedToTotals' => array('streamName' => true)) for all rows
* pass array('withRelatedToTotals' => array('streamName' => array('relationType', ...))) for particular rows
* to additionally call ->set('relatedToTotals', $t) on the stream objects.
* @param {array} [$options.withRelatedFromTotals]
* pass array('withRelatedFromTotals' => array('streamName' => true)) for all rows
* pass array('withRelatedFromTotals' => array('streamName' => array('relationType', ...))) for particular rows
* to additionally call ->set('relatedFromTotals', $t) on the stream objects.
* @return {Streams_Stream|null}
* Returns a Streams_Stream object with access info calculated
* specifically for $asUserId . Make sure to call the methods
* testReadLevel(), testWriteLevel() and testAdminLevel()
* on these streams before using them on the user's behalf.
*/
static function fetchOne(
$asUserId,
$publisherId,
$name,
$fields = '*',
$options = array())
{
$options['limit'] = 1;
$throwIfMissing = false;
if ($fields === true) {
$throwIfMissing = true;
$fields = '*';
}
$streams = Streams::fetch($asUserId, $publisherId, $name, $fields, $options);
if (empty($streams)) {
if ($throwIfMissing) {
throw new Q_Exception_MissingRow(array(
'table' => 'Stream',
'criteria' => Q::json_encode(compact('publisherId', 'name'))
));
}
return null;
}
return reset($streams);
}
/**
* Calculates the access for one or more streams by querying the database
* Modifies the objects in the $streams array, setting their access levels.
* After the function returns, you will be able to call the methods
* testReadLevel(), testWriteLevel() and testAdminLevel()
* on these streams before using them on the user's behalf.
* @method calculateAccess
* @static
* @param {string} $asUserId
* Set this to the user relative to whom access is calculated.
* If this matches the publisherId, just sets full access and calls publishedByFetcher(true).
* If this is '', only returns the streams anybody can see.
* If this is null, the logged-in user's id is used, or '' if no one is logged in
* @param {string} $publisherId
* The id of the user publishing these streams
* @param {array} $streams
* An array of streams, obtained for example by Streams::fetch
* @param {boolean} [$recalculate=false]
* Pass true here to force recalculating access to streams for which access was already calculated
* @param {string} [$actualPublisherId=null]
* For internal use only. Used by Streams::isAuthorizedToCreate function.
* @param {string} [$inheritAccess=true]
* Set to false to skip inheriting access from other streams, even if specified
* @return {integer}
* The number of streams that were recalculated
*/
static function calculateAccess(
$asUserId,
$publisherId,
$streams,
$recalculate = false,
$actualPublisherId = null,
$inheritAccess = true)
{
if (!isset($asUserId)) {
$asUserId = Users::loggedInUser();
if (!$asUserId) $asUserId = "";
}
if ($asUserId instanceof Users_User) {
$asUserId = $asUserId->id;
}
if ($publisherId instanceof Users_User) {
$publisherId = $publisherId->id;
}
if (!isset($actualPublisherId)) {
$actualPublisherId = $publisherId;
}
if ($recalculate) {
$streams2 = $streams;
} else {
$streams2 = array();
foreach ($streams as $k => $s) {
if ($s->get('readLevel', null) === null) {
$streams2[$k] = $s;
}
}
}
if (empty($streams2)) {
return 0;
}
$public_source = Streams::$ACCESS_SOURCES['public'];
$contact_source = Streams::$ACCESS_SOURCES['contact'];
$direct_source = Streams::$ACCESS_SOURCES['direct'];
$streams3 = array();
$names = array();
foreach ($streams2 as $s) {
if ($s->get('asUserId', null) === $asUserId) {
continue;
}
$s->set('asUserId', $asUserId);
if ($asUserId and $asUserId == $actualPublisherId) {
// The publisher should have full access to every one of their streams.
// Streams which are "required", though, won't be deleted by the system.
$required = Q_Config::get('Streams', 'requiredUserStreams', $s->name, false);
$s->set('isRequired', $required);
$s->set('readLevel', Streams::$READ_LEVEL['max']);
$s->set('writeLevel', Streams::$WRITE_LEVEL['max']);
$s->set('adminLevel', Streams::$ADMIN_LEVEL['max']);
$s->set('readLevel_source', $direct_source);
$s->set('writeLevel_source', $direct_source);
$s->set('adminLevel_source', $direct_source);
$s->publishedByFetcher(true);
continue;
}
$s->set('readLevel', $s->readLevel);
$s->set('writeLevel', $s->writeLevel);
$s->set('adminLevel', $s->adminLevel);
$s->set('permissions', $s->getAllPermissions());
$s->set('readLevel_source', $public_source);
$s->set('writeLevel_source', $public_source);
$s->set('adminLevel_source', $public_source);
$s->set('permissions_source', $public_source);
if (empty($asUserId)) {
continue; // No need to fetch further access info.
}
$names[] = $s->name;
$names[] = $s->type."*";
$streams3[] = $s;
}
if (empty($streams3)) {
return count($streams2);
}
// Get the per-label access data
// Avoid making a join to allow more flexibility for sharding
$accesses = Streams_Access::select()
->where(array(
'publisherId' => array($publisherId, ''),
'streamName' => $names,
'ofUserId' => array('', $asUserId)
))->ignoreCache()->fetchDbRows();
$labels = array();
foreach ($accesses as $access) {
if ($access->ofContactLabel) {
$labels[] = $access->ofContactLabel;
}
}
if (!empty($labels)) {
$labels = array_unique($labels);
$contacts = Users_Contact::select()
->where(array(
'userId' => $actualPublisherId,
'label' => $labels,
'contactUserId' => $asUserId
))->fetchDbRows();
foreach ($contacts as $contact) {
foreach ($accesses as $access) {
if ($access->ofContactLabel !== $contact->label) {
continue;
}
foreach ($streams3 as $stream) {
$tail = substr($access->streamName, -1);
$head = substr($access->streamName, 0, -1);
if ($stream->name !== $access->streamName
and ($tail !== '*' or $head !== $stream->type)) {
continue;
}
$readLevel = $stream->get('readLevel', 0);
$writeLevel = $stream->get('writeLevel', 0);
$adminLevel = $stream->get('adminLevel', 0);
if ($access->readLevel >= 0 and $access->readLevel > $readLevel) {
$stream->set('readLevel', $access->readLevel);
$stream->set('readLevel_source', $contact_source);
}
if ($access->writeLevel >= 0 and $access->writeLevel > $writeLevel) {
$stream->set('writeLevel', $access->writeLevel);
$stream->set('writeLevel_source', $contact_source);
}
if ($access->adminLevel >= 0 and $access->adminLevel > $adminLevel) {
$stream->set('adminLevel', $access->adminLevel);
$stream->set('adminLevel_source', $contact_source);
}
$p1 = $access->getAllPermissions();
$p2 = $stream->get('permissions', array());
$stream->set('permissions', array_unique(array_merge($p1, $p2)));
$stream->set('permissions_source', $contact_source);
}
}
}
}
// Override with per-user access data
foreach ($accesses as $access) {
foreach ($streams3 as $stream) {
$tail = substr($access->streamName, -1);
$head = substr($access->streamName, 0, -1);
if ($stream->name !== $access->streamName
and ($tail !== '*' or $head !== $stream->type)) {
continue;
}
if ($access->ofUserId === $asUserId) {
if ($access->readLevel >= 0) {
$stream->set('readLevel', $access->readLevel);
$stream->set('readLevel_source', $direct_source);
}
if ($access->writeLevel >= 0) {
$stream->set('writeLevel', $access->writeLevel);
$stream->set('writeLevel_source', $direct_source);
}
if ($access->adminLevel >= 0) {
$stream->set('adminLevel', $access->adminLevel);
$stream->set('adminLevel_source', $direct_source);
}
$stream->set('permissions', $access->getAllPermissions());
$stream->set('permissions_source', $direct_source);
}
}
}
if ($inheritAccess) {
$streams4 = array();
$toFetch = array();
foreach ($streams3 as $s) {
if (empty($s->inheritAccess)) {
continue;
}
$inheritAccess = json_decode($s->inheritAccess, true);
if (!$inheritAccess or !is_array($inheritAccess)) {
continue;
}
$streams4[] = $s;
foreach ($inheritAccess as $ia) {
$toFetch[reset($ia)][] = next($ia);
}
}
// group the fetches by publisher and execute them in batches
foreach ($toFetch as $publisherId => $streamNames) {
$streamNames = array_unique($streamNames);
Streams::fetch($asUserId, $publisherId, $streamNames);
}
// this will now use the cached results of the above calls to Streams::fetch
foreach ($streams4 as $s) {
$s->inheritAccess();
}
}
return count($streams2);
}
/**
* Calculates whether a given user is authorized by a specific publisher
* to create a particular type of stream.
* @method isAuthorizedToCreate
* @static
* @param {string} $userId The user who would be creating the stream.
* @param {string} $publisherId The id of the user who would be publishing the stream.
* @param {string} $streamType The type of the stream that would be created
* @param {array} [$relate=array()]
* The user would also be authorized if the stream would be related to
* an existing category stream, in which the user has a writeLevel of at least "relate",
* and the user that would be publishing this new stream has a template for this stream type
* that is related to either the category stream or a template matching the category stream.
* To test for this, pass an array with the following keys:
* @param {string} $relate.publisherId The id of the user publishing that stream, defaults to $publisherId
* @param {string} $relate.streamName The name of the stream to which the new stream would be related
* @param {string} $relate.type The type of relation, defaults to ""
* @return {Streams_Stream|boolean} Returns a stream template the user must use,
* otherwise a boolean true/false to indicate a yes or no regardless of template.
*/
static function isAuthorizedToCreate(
$userId,
$publisherId,
$streamType,
$relate = array())
{
$authorized = false;
if (!empty($relate['streamName'])) {
if (empty($relate['publisherId'])) {
$relate['publisherId'] = $publisherId;
}
if (empty($relate['type'])) {
$relate['type'] = '';
}
}
if ($publisherId == $userId) {
$authorized = true; // user can publish streams under their own name
}
if (!$authorized) {
// Check for permissions using templates
$template = new Streams_Stream();
$template->publisherId = $publisherId;
$template->name = $streamType.'/';
$template->type = 'Streams/template';
$retrieved = $template->retrieve();
if (!$retrieved) {
$template->publisherId = '';
$retrieved = $template->retrieve();
}
if ($retrieved) {
$template->calculateAccess($userId, false, $publisherId);
if ($template->testAdminLevel('own')) {
$authorized = $template;
}
}
}
if (!$authorized and $retrieved and !empty($relate['streamName'])) {
// Check if user is perhaps authorized to create a related stream
$to_stream = Streams::fetchOne(
$userId, $relate['publisherId'], $relate['streamName']
);
if ($to_stream and $to_stream->testWriteLevel('relate')) {
$to_template = new Streams_Stream();
$to_template->publisherId = $to_stream->publisherId;
$to_template->name = $to_stream->type.'/';
$to_template->type = 'Streams/template';
$retrieved = $to_template->retrieve();
if (!$retrieved) {
$to_template->publisherId = '';
$retrieved = $to_template->retrieve();
}
if ($retrieved) {
$relatedTo = new Streams_RelatedTo();
$relatedTo->toPublisherId = $to_template->publisherId;
$relatedTo->toStreamName = $to_template->name;
$relatedTo->type = $relate['type'];
$relatedTo->fromPublisherId = $template->publisherId;
$relatedTo->fromStreamName = $template->name;
if ($retrieved = $relatedTo->retrieve()) {
$authorized = $template;
}
}
}
}
return $authorized;
}
/**
* Creates a new stream in the system
* @method create
* @static
* @param {string} $asUserId The user who is attempting to create the stream.
* @param {string} $publisherId The id of the user to publish the stream.
* @param {string|array} [$type=null] The type of the stream to create.
* Required unless you specify the name of the stream in the options,
* and you've specified its type in a config file listed under a config named
* "Streams/userStreams/$module".
* @param {array} [$fields=array()] Use this to set additional fields for the stream:
* @param {string} [$fields.title=null] You can set the stream's title
* @param {string} [$fields.icon=null] You can set the stream's icon
* @param {string} [$fields.title=null] You can set the stream's content
* @param {string|array} [$fields.attributes=null] You can set the stream's attributes directly as a JSON string
* @param {string|integer} [$fields.readLevel=null] You can set the stream's read access level, see Streams::$READ_LEVEL
* @param {string|integer} [$fields.writeLevel=null] You can set the stream's write access level, see Streams::$WRITE_LEVEL
* @param {string|integer} [$fields.adminLevel=null] You can set the stream's admin access level, see Streams::$ADMIN_LEVEL
* @param {string} [$fields.name=null] Here you can specify an exact name for the stream to be created. Otherwise a unique one is generated automatically.
* @param {boolean} [$fields.skipAccess=false] Skip all access checks when creating and relating the stream.
* @param {array} [$relate=array()]
* Fill this out in order to relate the newly created stream to a category stream,
* and also inheritAccess from it. When using this option, a user may be authorized
* to create a stream they would otherwise not be authorized to create.
* This happens when the asUserId user has a writeLevel of at least "relate" in the
* existing category stream, and the publisherId user has a template for this stream
* type that is related to either the category stream, or a template for the
* category stream's type.
* @param {string} [$relate.publisherId] The id of the user publishing the category stream, defaults to $publisherId
* @param {string} [$relate.streamName] The name of the category stream
* @param {string} [$relate.type] The type of relation, defaults to ""
* @param {string} [$relate.weight] To set the weight for the relation. You can pass a numeric value here, or something like "max+1" to make the weight 1 greater than the current MAX(weight)
* @param {array} [&$result=null] Optionally pass a reference here to hold the result of calling Streams::relate().
* @return {Streams_Stream} Returns the stream that was created.
* @throws {Users_Exception_NotAuthorized}
*/
static function create(
$asUserId,
$publisherId,
$type = null,
$fields = array(),
$relate = null,
&$result = null)
{
if (!isset($fields)) {
$fields = array();
}
$skipAccess = Q::ifset($fields, 'skipAccess', false);
if (!isset($asUserId)) {
$asUserId = Users::loggedInUser(true)->id;
} else if ($asUserId instanceof Users_User) {
$asUserId = $asUserId->id;
}
if ($publisherId instanceof Users_User) {
$publisherId = $publisherId->id;
}
// OK we are good to go!
$stream = new Streams_Stream;
$stream->publisherId = $publisherId;
if (!empty($fields['name'])) {
$p = Streams::userStreamsTree();
if ($info = $p->get($fields['name'], array())) {
foreach (Base_Streams_Stream::fieldNames() as $f) {
if (isset($info[$f])) {
$stream->$f = $info[$f];
}
}
}
}
if (!isset($stream->type)) {
$stream->type = $type;
}
$authorized = self::isAuthorizedToCreate(
$asUserId, $publisherId, $stream->type, $relate
);
if (!$authorized and !$skipAccess) {
throw new Users_Exception_NotAuthorized();
}
// prepare attributes field
if (isset($fields['attributes'])) {
if (is_array($fields['attributes'])) {
$fields['attributes'] = Q::json_encode($fields['attributes']);
} else if (is_string($fields['attributes'])) {
Q::json_decode($fields['attributes']); // may throw an exception
}
}
// extend with any config defaults for this stream type
$fieldNames = Streams::getExtendFieldNames($type);
$fieldNames[] = 'name';
foreach ($fieldNames as $f) {
if (isset($fields[$f])) {
$stream->$f = $fields[$f];
}
}
// ready to persist this stream to the database
if (!empty($relate['streamName'])) {
$rs = Streams::fetchOne(
$asUserId,
$relate['publisherId'],
$relate['streamName']
);
$inheritAccess = ($rs and $rs->inheritAccess)
? Q::json_decode($rs->inheritAccess)
: array();
$newInheritAccess = array($relate['publisherId'], $relate['streamName']);
if (!in_array($newInheritAccess, $inheritAccess)) {
$inheritAccess[] = $newInheritAccess;
}
$stream->inheritAccess = Q::json_encode($inheritAccess);
}
$stream->set('createdAsUserId', $asUserId);
$stream->save();
$stream->post($asUserId, array(
'type' => 'Streams/created',
'content' => '',
'instructions' => $stream->toArray()
), true);
// relate the stream to category stream, if any
if (!empty($relate['streamName'])) {
$relationType = isset($relate['type']) ? $relate['type'] : '';
$result = Streams::relate(
$asUserId,
$relate['publisherId'],
$relate['streamName'],
$relationType,
$stream->publisherId,
$stream->name,
array(
'weight' => isset($relate['weight']) ? $relate['weight'] : null,
'skipAccess' => $skipAccess
)
);
}
self::$fetch[$asUserId][$publisherId][$stream->name] = array('*' => $stream);
return $stream;
}
/**
* Takes some information out of an existing set of streams
* @method take
* @static
* @param {array} $streams
* @param {string} $name
* @param {string} $readLevel
* Test each stream for at least this read level.
* If the test fails, return null in its stead.
* @param {string|array} $field='content'
* Optional. Defaults to "content".
* Can be an array of fields, in which case the function returns an array.
* @param {boolean} [$escape=false]
* Defaults to false. If true, escapes the values as HTML
* @return {mixed}
* Returns the value of the field, or an array of values, depending on
* whether $field is an array or a string
*/
static function take(
$streams,
$name,
$readLevel,
$field = 'content',
$escape = false)
{
if (!isset($streams[$name])) {
return null;
}
$result = array();
$was_array = is_array($field);
$arr = $was_array ? $field : array($field);
foreach ($arr as $f) {
if (!isset($streams[$name]->$f)) {
return null;
}
if (!$streams[$name]->testReadLevel($readLevel)) {
return null;
}
$result[$f] = !$escape
? $streams[$name]->$f
: Q_Html::text($streams[$name]->$f);
}
return $was_array ? $result : reset($result);
}
/**
* Get all the streams starting with "Streams/user/" for a particular user.
* This is cached via Streams:;fetch, so you can call it repeatedly.
* @method forUser
* @static
* @param {string} $asUserId The id of the user who's supposed to be accessing the stream.
* @param {string} $publisherId The id of the user who is publishing the streams.
* @return {array}
*/
static function forUser($asUserId, $publisherId)
{
if (!isset($asUserId) or !isset($publisherId)) {
return null;
}
return Streams::fetch($asUserId, $publisherId, new Db_Range(
'Streams/user/', true, false, null
), '*');
}
/**
* A shorthand to get a stream whose name starts with "Streams/user/".
* @method my
* @static
* @param {string|array} $field='content'
* Optional. Defaults to "content".
* Can be an array of fields, in which case the function returns an array.
* @param {boolean} [$escape=false]
* If true, escapes as HTML
* @return {mixed}
* Returns the value of the field, or an array of values, depending on
* whether $field is an array or a string
*/
static function my($name, $field = 'content', $escape = false)
{
$user = Users::loggedInUser();
if (!$user) {
return null;
}
$streams = Streams::forUser($user->id, $user->id); // cached
// Since it's our stream, the testReadLevel will always succeed
return Streams::take($streams, $name, 0, $field, $escape);
}
/**
* Get the publisher id from the request, if it can be deduced
* @method requestedPublisherId
* @static
* @param {boolean} $throwIfMissing=false
* Optional. If true, throws an exception if the publisher id cannot be deduced
* @param {array|string} [$uri=Q_Dispatcher::uri()]
* An array or string representing a uri to use instead of the Q_Dispatcher::uri()
* @return {integer}
* The id of the publisher user
* @throws {Users_Exception_NoSuchUser}
* If the URI contains an invalid "username"
* @throws {Q_Exception_RequiredField}
* If the username can't be deduced, this is thrown
*/
static function requestedPublisherId($throwIfMissing = false, $uri = null)
{
if (isset(self::$requestedPublisherId_override)) {
return self::$requestedPublisherId_override;
}
if (!isset($uri)) {
$uri = Q_Dispatcher::uri();
}
$publisherId = null;
if (isset($_REQUEST['publisherId'])) {
$publisherId = $_REQUEST['publisherId'];
} else if (isset($uri->publisherId)) {
$publisherId = $uri->publisherId;
} else if (isset($uri->username)) {
$publisher = new Users_User();
$publisher->username = $uri->username; // Warning: SECONDARY_LOOKUP
if (!$publisher->retrieve()) {
throw new Users_Exception_NoSuchUser(array(), 'username');
}
$publisherId = $publisher->id;
} else if (Streams::$followedInvite) {
$publisherId = Streams::$followedInvite->publisherId;
} else if ($throwIfMissing) {
throw new Q_Exception_RequiredField(
array('field' => 'publisher id'),
'publisherId'
);
}
if ($throwIfMissing && !is_string($publisherId)) {
throw new Q_Exception_WrongType(array('field' => 'publisherId', 'type' => 'string'));
}
return $publisherId;
}
/**
* Get the stream name from the request, if it can be deduced.
* Checks $_REQUEST['streamName'], then tries $_REQUEST['name'].
* If both are empty, tries Q_Dispatcher::uri() and returns "{$uri->name_prefix}{$uri->name}",
* which is useful when the URL contains just the last part of a stream's name.
* @method requestedName
* @static
* @param {boolean} $throwIfMissing=false
* Optional. If true, throws an exception if the stream name cannot be deduced
* @param {string} [$returnAs='string'] Can be "array" or "string".
* @param {array|string} [$uri=Q_Dispatcher::uri()]
* An array or string representing a uri to use instead of the Q_Dispatcher::uri()
* @return {string}
* The name of the stream
* @throws {Q_Exception_RequiredField}
* If the name can't be deduced, this is thrown
*/
static function requestedName($throwIfMissing = false, $returnAs = 'string', $uri = null)
{
if (isset(self::$requestedName_override)) {
return self::$requestedName_override;
}
if (!isset($uri)) {
$uri = Q_Dispatcher::uri();
}
$name = $result = null;
$fieldName = 'streamName';
if (isset($_REQUEST['streamName'])) {
$result = $_REQUEST['streamName'];
} else if (isset($_REQUEST['name'])) {
$result = $_REQUEST['name'];
$fieldName = 'name';
} else if (isset($uri->streamName)) {
$result = $uri->streamName;
} else if (isset($uri->name)) {
$fieldName = 'name';
$result = $uri->name;
}
if (isset($result)) {
if (is_array($result)) {
$result = implode('/', $result);
}
$name = isset($uri->name_prefix) ? $uri->name_prefix.$result : $result;
} else if (Streams::$followedInvite) {
$name = Streams::$followedInvite->streamName;
} else if ($throwIfMissing) {
throw new Q_Exception_RequiredField(
array('field' => 'stream name'),
'streamName'
);
}
if ($throwIfMissing && !is_string($name)) {
throw new Q_Exception_WrongType(array('field' => $fieldName, 'type' => 'string or array'));
}
if ($returnAs === 'array' and is_string($name)) {
$name = explode('/', $result);
}
return $name;
}
/**
* Get the stream type from the request, if it can be deduced
* @method requestedType
* @static
* @param {boolean} $throwIfMissing=false
* Optional. If true, throws an exception if the stream type cannot be deduced
* @return {string}
* The type of the stream
* @throws {Q_Exception_RequiredField}
* If the type can't be deduced, this is thrown
*/
static function requestedType($throwIfMissing = false)
{
$uri = Q_Dispatcher::uri();
if (isset($_REQUEST['streamType'])) {
return $_REQUEST['streamType'];
} else if (isset($_REQUEST['type'])) {
return $_REQUEST['type'];
} else if (isset($uri->type)) {
return $uri->type;
}
if ($throwIfMissing) {
throw new Q_Exception_RequiredField(
array('field' => 'stream type'),
'streamType'
);
}
return null;
}
/**
* Get the message type from the request, if it can be deduced
* @method requestedMessageType
* @static
* @param {boolean} $throwIfMissing=false
* Optional. If true, throws an exception if the message type cannot be deduced
* @return {string}
* The type of the message
* @throws {Q_Exception_RequiredField}
* If the type can't be deduced, this is thrown
*/
static function requestedMessageType($throwIfMissing = false)
{
$uri = Q_Dispatcher::uri();
if (isset($_REQUEST['messageType'])) {
return $_REQUEST['messageType'];
} if (isset($_REQUEST['type'])) {
return $_REQUEST['type'];
} else if (isset($uri->type)) {
return $uri->type;
}
if ($throwIfMissing) {
throw new Q_Exception_RequiredField(
array('field' => 'message type'),
array('type')
);
}
return null;
}
/**
* Get the stream field from the request, if it can't be deduced throws error
* @method requestedField
* @static
* @param {string} $field
* The fiels name
* @param {boolean} $throwIfMissing=false
* Optional. If true, throws an exception if the stream field cannot be deduced
* @param {mixed} $default=null
* Is returned if field is not set
* @return {string}
* The value of the field
* @throws {Q_Exception_RequiredField}
* If the field value can't be deduced, this is thrown
*/
static function requestedField($field, $throwIfMissing = false, $default = null)
{
$uri = Q_Dispatcher::uri();
if (isset($_REQUEST[$field])) {
return $_REQUEST[$field];
} else if (isset($uri->$field)) {
if (is_array($uri->$field)) {
return implode('/', $uri->$field);
}
return $uri->$field;
} else if ($field = Q_Request::special("Streams.$field", $default)) {
return $field;
}
if ($throwIfMissing) {
throw new Q_Exception_RequiredField(
array('field' => "stream $field"),
$field
);
}
return $default;
}
/**
* Get the fields that have been requested in the request, otherwise '*'
* @method requestedFields
* @static
* @return {array|string}
* An array or string of fields to select
* @throws {Q_Exception}
* If requested field name is invalid
*/
static function requestedFields()
{
if (empty($_REQUEST['fields'])) {
return '*';
}
$fields = $_REQUEST['fields'];
$fields = is_array($fields) ? $fields : explode(',', $fields);
if (!in_array('publisherId', $fields)) {
$fields[] = 'publisherId';
}
if (!in_array('name', $fields)) {
$fields[] = 'name';
}
return $fields;
}
/**
* Produce user's display name
* @method displayName
* @static
* @param {string|Users_User} $userId
* Can be Users_User object or a string containing a user id
* @param {array} $streams=null
* An array of streams fetched for this user.
* If it is null, we fetch them as the logged-in user.
* @param {array} $options=array()
* @param {boolean} [$options.short] Show one part of the name only
* @param {boolean} [$options.show] The parts of the name to show. Can have the letters "f", "l", "u" in any order.
* @param {boolean} [$options.html] If true, encloses the first name, last name, username in span tags. If an array, then it will be used as the attributes of the html.
* @param {boolean} [$options.escape] If true, does HTML escaping of the retrieved field
* @param {string} [$options.asUserId=Users::loggedInUser()] Optionally override which user to get the display name as
* @param {string} [$options.fullAccess=false] if true, sets the $asUserId = $userId
* @param {string} [$fallback='Someone'] HTML to return if there is no info to get displayName from.
* @param {string|null} [$fallback='Someone']
* What to return if there is no info to get displayName from.
* @return {string|null}
*/
static function displayName($userId, $options = array(), $fallback = 'Someone')
{
if ($userId instanceof Users_User) {
$userId = $userId->id;
}
if (!empty($options['asUserId'])) {
$asUserId = $options['asUserId'];
} else if (!empty($options['fullAccess'])) {
$asUserId = $userId;
} else {
$asUser = Users::loggedInUser();
$asUserId = $asUser ? $asUser->id : "";
}
$avatar = Streams_Avatar::fetch($asUserId, $userId);
return $avatar ? $avatar->displayName($options, $fallback) : $fallback;
}
/**
* Gets relations. At most one of toStreamName, toStreamName can be an array.
* @method getRelations
* @private
* @param {string} $asUserId
* The user who is fetching
* @param {string} $toPublisherId
* The publisher of the category
* @param {string|array} $toStreamName
* The name of the category. May be turned into an array.
* @param {string} $type
* The type of the relation.
* @param {string} $fromPublisherId
* The publisher of the member stream(s)
* @param {string|array} $fromStreamName
* The name of the member stream(s). May be turned into an array.
* @param {array} $relatedTo reference to array of Streams_RelatedTo to fill
* @param {array} $relatedFrom reference to array of Streams_RelatedFrom to fill
* @param {array} $categories reference to array of Streams_Stream to fill with categories
* @param {array} $streams reference to array of Streams_Stream to fill with streams
* @param {array} $options=array() An array of options that can include:
* @param {boolean} [$options.skipAccess=false] If true, skips the access checks and just relates the stream to the category
*/
private static function getRelations(
&$asUserId,
$toPublisherId,
&$toStreamName,
$type,
$fromPublisherId,
&$fromStreamName,
&$relatedTo,
&$relatedFrom,
&$categories,
&$streams,
&$arrayField,
&$options = array())
{
if (!isset($asUserId)) {
$asUserId = Users::loggedInUser(true)->id;
} else if ($asUserId instanceof Users_User) {
$asUserId = $asUserId->id;
}
if ($toPublisherId instanceof Users_User) {
$toPublisherId = $toPublisherId->id;
}
if (!isset($options)) {
$options = array();
}
if (is_array($toStreamName)) {
if (is_array($fromStreamName)) {
throw new Q_Exception("toStreamName and fromStreamName can't both be arrays");
}
$arrayField = 'toStreamName';
} else if (is_array($fromStreamName)) {
$arrayField = 'fromStreamName';
} else {
$toStreamName = array($toStreamName);
$arrayField = 'toStreamName';
}
// Check access to category stream, the stream to which other streams are related
$categories = Streams::fetch($asUserId, $toPublisherId, $toStreamName);
if (empty($options['skipAccess'])) {
foreach ($toStreamName as $sn) {
if (empty($categories[$sn])) {
throw new Q_Exception_MissingRow(array(
'table' => 'Stream',
'criteria' => Q::json_encode(array(
'publisherId' => $toPublisherId,
'streamName' => $toStreamName
))
));
}
if (!$categories[$sn]->testWriteLevel('relate')) {
throw new Users_Exception_NotAuthorized();
}
}
}
// Fetch member streams, the streams which are being related
$streams = Streams::fetch($asUserId, $fromPublisherId, $fromStreamName);
$criteria = compact(
'toPublisherId', 'toStreamName',
'type', 'fromPublisherId', 'fromStreamName'
);
// Fetch relatedTo
if ($relatedTo !== false) {
$relatedTo = Streams_RelatedTo::select()
->where($criteria)
->fetchDbRows(null, null, $arrayField);
}
// Fetch relatedFrom
if ($relatedFrom !== false) {
$relatedFrom = Streams_RelatedFrom::select()
->where($criteria)
->fetchDbRows(null, null, $arrayField);
}
// Recover from inconsistency:
// if one exists but not the other, delete both
$removeRT = $removeRF = $fieldRT = $fieldRF = array();
foreach ($relatedTo as $sn => $rt) {
if (empty($relatedFrom[$sn])) {
$removeRT[] = $sn;
$fieldRT[] = $rt->$arrayField;
}
}
foreach ($relatedFrom as $sn => $rf) {
if (empty($relatedTo[$sn])) {
$removeRF[] = $sn;
$fieldRF[] = $rf->$arrayField;
}
}
if ($removeRT) {
foreach ($removeRT as $sn) {
unset($relatedTo[$sn]);
}
$criteria2 = $criteria;
$criteria2[$arrayField] = $fieldRT;
Streams_RelatedTo::delete()
->where($criteria2)
->execute();
}
if ($removeRF) {
foreach ($removeRF as $sn) {
unset($relatedFrom[$sn]);
}
$criteria2 = $criteria;
$criteria2[$arrayField] = $fieldRF;
Streams_RelatedFrom::delete()
->where($criteria2)
->execute();
}
}
/**
* Make the stream a member of category or other aggregating stream,
* First parameter set - where to add, Second parameter set - what to add
* NOTE: this currently only works when fromPublisherId and toPublisherId are on same Q cluster
* @method relate
* @static
* @param {string} $asUserId
* The user who is making aggreagtor operation on the stream (add stream to category)
* @param {string} $toPublisherId
* The user who has published the category stream
* @param {string|array} $toStreamName
* The name of the category stream. Pass an array of strings to relate a single stream
* to multiple categories, but in that case make sure fromStreamName is a string.
* @param {string} $type
* The type of the relation
* @param {string} $fromPublisherId
* The user who has published the related stream
* @param {string} $fromStreamName
* The name of the related stream. Pass an array of strings to relate multiple streams
* to a single category, but in that case make sure toStreamName is a string.
* @param {array} $options=array()
* An array of options that can include:
* @param {boolean} [$options.skipAccess=false] If true, skips the access checks and just relates the stream to the category
* @param {boolean} [$options.skipMessageTo=false] If true, skips posting the Streams/relatedFrom message to the "to" streams
* @param {boolean} [$options.skipMessageFrom=false] If true, skips posting the Streams/relatedFrom message to the "from" streams
* @param {double|string} [$options.weight] Pass a numeric value here, or something like "max+1" to make the weight 1 greater than the current MAX(weight)
* @param {array} [$options.extra] Can be array of ($streamName => $extra) info
* to save in the "extra" field.
* @return {array|boolean}
* Returns false if the operation was canceled by a hook
* Returns true if relation was already there
* Otherwise returns array with keys "messagesFrom" and "messageTo" and values of type Streams_Message
*/
static function relate(
$asUserId,
$toPublisherId,
$toStreamName,
$type,
$fromPublisherId,
$fromStreamName,
$options = array())
{
self::getRelations(
$asUserId,
$toPublisherId,
$toStreamName,
$type,
$fromPublisherId,
$fromStreamName,
$relatedToArray,
$relatedFromArray,
$categories,
$streams,
$arrayField,
$options);
// calculate weights
$calculateWeights = null;
foreach ($$arrayField as $sn) {
if (isset($relatedToArray[$sn])) {
continue;
}
if (!isset($relatedToArray[$sn])) {
// at least one new relation will be inserted
if (isset($options['weight'])) {
$parts = explode('+', "$options[weight]");
if (count($parts) > 1) {
$calculateWeights = $parts[1];
break;
}
}
}
}
if (isset($calculateWeights)) {
if (!is_numeric($calculateWeights) or $calculateWeights <= 0) {
throw new Q_Exception_WrongValue(array(
'field' => 'weight',
'range' => 'a positive numeric value'
), 'weight');
}
$rows = Streams_RelatedTo::select('toStreamName, MAX(weight) w')
->where(compact('toPublisherId', 'toStreamName', 'type'))
->groupBy('toStreamName')
->ignoreCache()
->fetchAll(PDO::FETCH_ASSOC);
$maxWeights = array();
foreach ($rows as $r) {
$maxWeights[$r['toStreamName']] = $r['w'];
}
}
$newRT = array();
$newRF = array();
$newRTT = array();
$newRFT = array();
$weights2 = array();
foreach ($$arrayField as $sn) {
if (isset($relatedToArray[$sn])) {
continue;
}
$category = ($arrayField === 'toStreamName') ? $categories[$sn] : reset($categories);
$stream = ($arrayField === 'fromStreamName') ? $streams[$sn] : reset($streams);
if (!empty($options['extra'][$sn])) {
$extra = $options['extra'][$sn];
$extra = is_string($extra) ? $extra : Q::json_encode($extra);
} else {
unset($extra);
}
/**
* @event Streams/relateTo/$categoryType {before}
* @param {array} relatedTo
* @param {array} relatedFrom
* @param {string} asUserId
* @param {Streams_Stream} category
* @param {Streams_Stream} stream
* @return {false} To cancel further processing
*/
if (false === Q::event(
"Streams/relateTo/{$category->type}",
compact('asUserId', 'category', 'stream', 'extra'),
'before'
)) {
continue;
}
/**
* @event Streams/relateFrom/$streamType {before}
* @param {array} relatedTo
* @param {array} relatedFrom
* @param {string} asUserId
* @param {Streams_Stream} category
* @param {Streams_Stream} stream
* @return {false} To cancel further processing
*/
if (false === Q::event(
"Streams/relateFrom/{$stream->type}",
compact('asUserId', 'category', 'stream'),
'before'
)) {
continue;
}
$tsn = ($arrayField === 'toStreamName') ? $sn : $toStreamName;
$newRT[$sn] = $newRF[$sn] = compact(
'toPublisherId', 'type', 'fromPublisherId'
);
if (isset($extra)) {
$newRT[$sn]['extra'] = $extra;
}
if ($calculateWeights) {
if (!isset($weights2[$tsn])) {
$weights2[$tsn] = isset($maxWeights[$tsn]) ? $maxWeights[$tsn] : 0;
}
$weights2[$tsn] += $calculateWeights;
$newRT[$sn]['weight'] = $weights2[$tsn];
} else if (isset($options['weight'])) {
$weights2[$tsn] = $newRT[$sn]['weight'] = $options['weight'];
}
foreach (array('toStreamName', 'fromStreamName') as $f) {
$newRT[$sn][$f] = $newRF[$sn][$f] = ($f === $arrayField) ? $sn : $$f;
}
$newRTT[] = array(
'toPublisherId' => $category->publisherId,
'toStreamName' => $category->name,
'relationType' => $type,
'fromStreamType' => $stream->type,
'relationCount' => 1
);
$newRFT[] = array(
'fromPublisherId' => $stream->publisherId,
'fromStreamName' => $stream->name,
'relationType' => $type,
'toStreamType' => $category->type,
'relationCount' => 1
);
}
// Insert/update all the relatedTo and relatedFrom rows
Streams_RelatedTo::insertManyAndExecute($newRT);
Streams_RelatedFrom::insertManyAndExecute($newRF);
// Insert/update all the corresponding totals
Streams_RelatedToTotal::insertManyAndExecute($newRTT, array(
'onDuplicateKeyUpdate' => array(
'relationCount' => new Db_Expression('relationCount + 1')
)
));
Streams_RelatedFromTotal::insertManyAndExecute($newRFT, array(
'onDuplicateKeyUpdate' => array(
'relationCount' => new Db_Expression('relationCount + 1')
)
));
$relatedFrom_messages = array();
$relatedTo_messages = array();
foreach ($$arrayField as $sn) {
$category = ($arrayField === 'toStreamName') ? $categories[$sn] : reset($categories);
$stream = ($arrayField === 'fromStreamName') ? $streams[$sn] : reset($streams);
$weight = (isset($options['weight']) && is_numeric($options['weight']))
? $options['weight']
: null;
$weight = Q::ifset($weights2, $category->name, $weight);
if (!$stream) {
continue;
}
$fromUri = $stream->uri();
$fromUrl = $stream->url();
$fromIcon = $stream->icon;
$fromTitle = $stream->title;
$fromType = $stream->type;
$fromDisplayType = Streams_Stream::displayType($fromType);
if (!$category) {
continue;
}
$toUri = $category->uri();
$toUrl = $category->url();
$toIcon = $category->icon;
$toTitle = $category->title;
$toType = $category->type;
$toDisplayType = Streams_Stream::displayType($toType);
$parts = explode('/', $type);
$displayType = end($parts);
if (substr(end($parts), -1) === 's') {
$displayType = substr($displayType, 0, -1);
}
$categoryName = explode('/', $category->name);
$streamName = explode('/', $stream->name);
$params = compact(
'relatedTo', 'relatedFrom', 'asUserId', 'category', 'stream',
'fromUri', 'fromUrl',
'fromIcon', 'fromTitle', 'fromType', 'fromDisplayType',
'toUri', 'toUrl',
'toIcon', 'toTitle', 'toType', 'toDisplayType',
'displayType', 'categoryName', 'streamName', 'extra'
);
if ($u = Streams_Stream::getConfigField($category->type,
array('relatedTo', $type, 'url'),
Streams_Stream::getConfigField($category->type, array(
'relatedTo', '*', 'url', null
))
)) {
$fromUrl = Q_Uri::url(Q_Handlebars::renderSource($u, $params));
}
if ($u = Streams_Stream::getConfigField($stream->type,
array('relatedFrom', $type, 'url'),
Streams_Stream::getConfigField($stream->type, array(
'relatedFrom', '*', 'url', null
))
)) {
$toUrl = Q_Uri::url(Q_Handlebars::renderSource($u, $params));
}
$description = Q_Handlebars::renderSource(
Streams_Stream::getConfigField($category->type,
array('relatedTo', $type, 'description'),
Streams_Stream::getConfigField($category->type, array(
'relatedTo', '*', 'description'
), "New $fromDisplayType added"
)),
$params
);
// Send Streams/relatedTo message to a stream
// node server will be notified by Streams_Message::post
// DISTRIBUTED: in the future, the publishers may be on separate domains
// so posting this message may require internet communication.
$instructions = compact(
'fromPublisherId', 'type', 'weight', 'displayType',
'fromUrl', 'toUrl',
'fromIcon', 'fromTitle', 'fromType', 'fromDisplayType', 'description'
);
$instructions['url'] = $instructions['fromUrl'];
$instructions['fromStreamName'] = $stream->name;
$relatedTo_messages[$toPublisherId][$category->name][] = array(
'type' => 'Streams/relatedTo',
'instructions' => $instructions
);
$description = Q_Handlebars::renderSource(
Streams_Stream::getConfigField($stream->type, array('relatedFrom', $type, 'description'),
Streams_Stream::getConfigField($stream->type, array('relatedFrom', '*', 'description'),
"Added to {{toDisplayType}} as $displayType"
)),
$params
);
// Send Streams/relatedFrom message to a stream
// node server will be notified by Streams_Message::post
// DISTRIBUTED: in the future, the publishers may be on separate domains
// so posting this message may require internet communication.
$instructions = compact(
'toPublisherId', 'type', 'weight', 'displayType',
'fromUrl', 'toUrl', 'fromUri', 'toUri',
'toIcon', 'toTitle', 'toType', 'toDisplayType', 'description'
);
$instructions['url'] = $instructions['toUrl'];
$instructions['toStreamName'] = $category->name;
$relatedFrom_messages[$fromPublisherId][$stream->name][] = array(
'type' => 'Streams/relatedFrom',
'instructions' => $instructions
);
/**
* @event Streams/relateFrom/$streamType {after}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
* @param {array} extra
* @param {Streams_Stream} category
* @param {Streams_Stream} stream
*/
Q::event(
"Streams/relateFrom/{$stream->type}",
compact('relatedTo', 'relatedFrom', 'asUserId', 'category', 'stream', 'extra'),
'after'
);
/**
* @event Streams/relateTo/$categoryType {after}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
* @param {Streams_Stream} category
* @param {Streams_Stream} stream
*/
Q::event(
"Streams/relateTo/{$category->type}",
compact('relatedTo', 'relatedFrom', 'asUserId', 'category', 'stream'),
'after'
);
}
if (empty($options['skipMessageTo'])) {
list($messagesTo, $s) = Streams_Message::postMessages($asUserId, $relatedTo_messages, true);
}
if (empty($options['skipMessageFrom'])) {
list($messagesFrom, $s) = Streams_Message::postMessages($asUserId, $relatedFrom_messages, true);
}
return compact('messagesTo', 'messagesFrom');
}
/**
* Remove a relation where a member stream is related to a category stream.
* Can only remove one relation at a time. Adjusts weights after removal.
* @method unrelate
* @static
* @param {string} $asUserId
* The user who is making unrelate operation on the stream (remove stream from category)
* @param {string} $toPublisherId
* The user who has published the category stream
* @param {string|array} $toStreamName
* The name of the category stream. Pass an array of strings to relate a single stream
* to multiple categories, but in that case make sure fromStreamName is a string.
* @param {string} $type
* The type of the relation
* @param {string} $fromPublisherId
* The user who has publishes the related stream
* @param {string} $fromStreamName
* The name of the related stream. Pass an array of strings to relate multiple streams
* to a single category, but in that case make sure toStreamName is a string.
* @param {array} $options=array()
* An array of options that can include:
* @param {boolean} [$options.skipAccess=false] If true, skips the access checks and just unrelates the stream from the category
* @param {boolean} [$options.skipMessageTo=false] If true, skips posting the Streams/unrelatedTo message to the "to" streams
* @param {boolean} [$options.skipMessageFrom=false] If true, skips posting the Streams/unrelatedFrom message to the "from" streams
* @param {boolean} [$options.adjustWeights=false] If true, also decrements all following relations' weights by one.
* @return {boolean}
* Whether the relation was removed
*/
static function unrelate(
$asUserId,
$toPublisherId,
$toStreamName,
$type,
$fromPublisherId,
$fromStreamName,
$options = array())
{
self::getRelations(
$asUserId,
$toPublisherId,
$toStreamName,
$type,
$fromPublisherId,
$fromStreamName,
$relatedTo,
$relatedFrom,
$categories,
$streams,
$arrayField,
$options);
if (!$relatedTo && !$relatedFrom) {
return false;
}
$relatedTo = reset($relatedTo);
$relatedFrom = reset($relatedFrom);
$category = reset($categories);
$stream = reset($streams);
if (is_array($toStreamName)) {
$toStreamName = reset($toStreamName);
}
if (is_array($fromStreamName)) {
$fromStreamName = reset($fromStreamName);
}
// Now, clean up the relation.
/**
* @event Streams/unrelateTo/$streamType {before}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
* @return {false} To cancel further processing
*/
if (Q::event(
"Streams/unrelateTo/{$category->type}",
compact('relatedTo', 'relatedFrom', 'asUserId'),
'before') === false
) {
return;
}
/**
* @event Streams/unrelateFrom/$streamType {before}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
* @return {false} To cancel further processing
*/
if (Q::event(
"Streams/unrelateFrom/{$stream->type}",
compact('relatedTo', 'relatedFrom', 'asUserId'),
'before') === false
) {
return;
}
/*
* remove 'Streams/relation' from $relatedTo.
* we consider category stream as 'remote' i.e. more error prone.
*/
$weight = isset($relatedTo->weight) ? $relatedTo->weight : null;
if ($relatedTo && $relatedTo->remove()) {
if (isset($weight) and !empty($options['adjustWeights'])) {
$criteria = array(
'toPublisherId' => $toPublisherId,
'toStreamName' => $category->name,
'type' => $type,
'weight' => new Db_Range($weight, false, false, null)
);
Streams_RelatedTo::update()->set(array(
'weight' => new Db_Expression("weight - 1")
))->where($criteria)->execute();
}
Streams_RelatedToTotal::update()->set(array(
'relationCount' => new Db_Expression('relationCount - 1')
))->where(array(
'toPublisherId' => $category->publisherId,
'toStreamName' => $category->name,
'relationType' => $type,
'fromStreamType' => $stream->type,
))->execute();
// Send Streams/unrelatedTo message to a stream
// node server will be notified by Streams_Message::post
if (empty($options['skipMessageTo'])) {
Streams_Message::post($asUserId, $toPublisherId, $category->name, array(
'type' => 'Streams/unrelatedTo',
'instructions' => compact(
'fromPublisherId', 'fromStreamName', 'type', 'options', 'weight'
)
), true);
}
}
if ($relatedFrom && $relatedFrom->remove()) {
Streams_RelatedFromTotal::update()->set(array(
'relationCount' => new Db_Expression('relationCount - 1')
))->where(array(
'fromPublisherId' => $stream->publisherId,
'fromStreamName' => $stream->name,
'relationType' => $type,
'toStreamType' => $category->type,
))->execute();
if (empty($options['skipMessageFrom'])) {
// Send Streams/unrelatedFrom message to a stream
// node server will be notified by Streams_Message::post
Streams_Message::post($asUserId, $fromPublisherId, $stream->name, array(
'type' => 'Streams/unrelatedFrom',
'instructions' => compact(
'toPublisherId', 'toStreamName', 'type', 'options'
)
), true);
}
}
/**
* @event Streams/unrelateFrom/$streamType {after}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
*/
Q::event(
"Streams/unrelateFrom/{$stream->type}",
compact('relatedTo', 'relatedFrom', 'asUserId'),
'after'
);
/**
* @event Streams/unrelateTo/$categoryType {after}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
*/
Q::event(
"Streams/unrelateTo/{$category->type}",
compact('relatedTo', 'relatedFrom', 'asUserId'),
'after'
);
return true;
}
/**
* Fetch all the streams which are related to, or from, a given stream.
* Right now, all the streams that are fetched have to be from the same publisher.
* So, if there are relations to streams from other publishers, you have to additionally
* go ahead and fetch them yourself.
* @method related
* @static
* @param {string} $asUserId
* The user who is fetching
* @param {string} $publisherId
* The publisher of the stream
* @param {string|array|Db_Range} $streamName
* The name of the stream which is presumably related to/from other streams
* @param {mixed} $isCategory=true
* If false, returns the categories that this stream is related to.
* If true, returns all the streams this related to this category.
* If a string, returns all the streams related to this category with names prefixed by this string.
* @param {array} $options=array()
* @param {boolean} [$options.orderBy=false] Defaults to false, which means order by decreasing weight. True means order by increasing weight.
* @param {integer} [$options.limit] number of records to fetch
* @param {integer} [$options.offset] offset to start from
* @param {double} [$options.min] the minimum orderBy value (inclusive) to filter by, if any
* @param {double} [$options.max] the maximum orderBy value (inclusive) to filter by, if any
* @param {string|array|Db_Range} [$options.type] if specified, this filters the type of the relation.
* Can be useful for implementing custom indexes using relations and varying the value of "type".
* @param {string|array|Db_Range} [$options.weight] if specified, this filters the weight of the relation.
* Can be useful for implementing custom indexes using relations and varying the weight ranges.
* Only used if $options.isCategory is true.
* @param {string} [$options.prefix] if specified, this filters by the prefix of the related streams
* @param {string} [$options.title] if specified, this filters the titles of the streams with a LIKE condition
* @param {array} [$options.where] you can also specify any extra conditions here
* @param {array} [$options.fetchOptions] An array of any options to pass to Streams::fetch when fetching streams
* @param {array} [$options.relationsOnly] If true, returns only the relations to/from stream, doesn't fetch the other data. Useful if publisher id of relation objects is not the same as provided by publisherId.
* @param {array} [$options.streamsOnly] If true, returns only the streams related to/from stream, doesn't return the other data.
* @param {array} [$options.streamFields] If specified, fetches only the fields listed here for any streams.
* @param {callable} [$options.filter] Optional function to call to filter the relations. It should return a filtered array of relations.
* @param {array} [$options.skipFields] Optional array of field names. If specified, skips these fields when fetching streams
* @param {array} [$options.skipTypes] Optional array of ($streamName => $relationTypes) to skip when fetching relations.
* @param {array} [$options.includeTemplates] Defaults to false. Pass true here to include template streams (whose name ends in a slash) among the related streams.
* @return {array}
* Returns array($relations, $relatedStreams, $stream).
* However, if $streamName wasn't a string or ended in "/"
* then the third parameter is an array of streams.
* Also, if you passed "relationsOnly" or "streamsOnly", then it returns
* the $relations or $relatedStreams array by itself, respectively.
*/
static function related(
$asUserId,
$publisherId,
$streamName,
$isCategory = true,
$options = array())
{
if (is_string($streamName) and substr($streamName, -1) === '/') {
$streamName = new Db_Range($streamName, false, false, true);
}
$returnMultiple = !is_string($streamName);
if (is_array($isCategory)) {
$options = $isCategory;
$isCategory = true;
}
$skipTypes = Q::ifset($options, 'skipTypes', array());
// Check access to stream
$fetchOptions = isset($options['fetchOptions']) ? $options['fetchOptions'] : null;
$rows = Streams::fetch($asUserId, $publisherId, $streamName, '*', $fetchOptions);
$streams = array();
foreach($rows as $n => $row) {
if (!$row) continue;
if (!$row->testReadLevel('relations')) {
throw new Users_Exception_NotAuthorized();
}
if (!$row->testReadLevel('participants')) {
$skipTypes[$n][] = 'Streams/participating';
}
$streams[$n] = $row;
}
if (!$streams) {
if (!empty($options['relationsOnly'])
|| !empty($options['streamsOnly'])) {
return array();
}
return array(array(), array(), $returnMultiple ? array() : null);
}
$stream = reset($streams);
if ($isCategory) {
$query = Streams_RelatedTo::select()
->where(array(
'toPublisherId' => $publisherId,
'toStreamName' => $streamName
));
} else {
$query = Streams_RelatedFrom::select()
->where(array(
'fromPublisherId' => $publisherId,
'fromStreamName' => $streamName
));
}
if ($isCategory) {
if (empty($options['orderBy'])) {
$query = $query->orderBy('weight', false);
} else if ($options['orderBy'] === true) {
$query = $query->orderBy('weight', true);
}
if (!empty($options['weight'])) {
$query = $query->andWhere(array('weight' => $options['weight']));
}
}
if (isset($options['prefix'])) {
if (substr($options['prefix'], -1) !== '/') {
throw new Q_Exception("prefix has to end in a slash", 'prefix');
}
$other_field = $isCategory ? 'fromStreamName' : 'toStreamName';
$query = $query->where(array(
$other_field => new Db_Range($options['prefix'], true, false, true)
));
}
if (!empty($options['title'])) {
if (!is_string($options['title'])) {
throw new Q_Exception_WrongType(array(
'field' => 'filter',
'type' => 'string'
));
}
$query = $query->where(array(
'title LIKE ' => $options['title']
));
}
$max_limit = Q_Config::expect('Streams', 'db', 'limits', 'stream');
$max_offset = (Q_Config::expect('Streams', 'db', 'pages') - 1) * $max_limit - 1;
$offset = !empty($options['offset']) ? $options['offset'] : 0;
$limit = !empty($options['limit']) ? $options['limit'] : $max_limit;
if (!is_numeric($offset) or $offset > $max_offset) {
throw new Q_Exception("Streams::related offset is too large, must be <= $max_offset");
}
if (!is_numeric($limit) or $limit > $max_limit) {
throw new Q_Exception("Streams::related limit is too large, must be <= $max_limit");
}
$min = isset($options['min']) ? $options['min'] : null;
$max = isset($options['max']) ? $options['max'] : null;
if (isset($min) or isset($max)) {
$range = new Db_Range($min, true, true, $max);
$query = $query->where(array('weight' => $range));
}
if (isset($limit)) {
$query = $query->limit($limit, $offset);
}
if (isset($options['type'])) {
$query = $query->where(array('type' => $options['type']));
}
if (isset($options['where'])) {
$query = $query->where($options['where']);
}
$FT = $isCategory ? 'from' : 'to';
if (empty($options['includeTemplates'])) {
$col = $FT.'StreamName';
$query = $query->where(new Db_Expression(
"SUBSTRING($col, -1, 1) != '/'"
));
}
$col2 = $isCategory ? 'toStreamName' : 'fromStreamName';
$relations = $query->fetchDbRows();
foreach ($relations as $k => $v) {
if (!empty($options['includeTemplates'])
and substr($k, -1) === '/') {
unset($relations[$k]);
} else if (!empty($skipTypes[$v->$col2])
and in_array($v->type, $skipTypes[$v->$col2])) {
unset($relations[$k]);
}
}
if (empty($relations)) {
if (!empty($options['relationsOnly'])
or !empty($options['streamsOnly'])) {
return array();
}
return array(array(), array(), $returnMultiple ? $streams : $stream);
}
if (!empty($options['filter'])) {
$relations = call_user_func($options['filter'], $relations);
}
if (!empty($options['relationsOnly'])) {
return $relations;
}
$fields = '*';
if (isset($options['skipFields'])) {
$skip_fields = is_array($options['skipFields'])
? $options['skipFields']
: explode(',', $options['skipFields']);
$fields = implode(',', array_diff(Streams_Stream::fieldNames(), $skip_fields));
} else if (isset($options['streamFields'])) {
$fields = is_string($options['streamFields'])
? $options['streamFields']
: implode(',', $options['streamFields']);
}
$names = array();
$FTP = $FT.'PublisherId';
$FSN = $FT.'StreamName';
foreach ($relations as $name => $r) {
if ($r->$FTP === $publisherId) {
$names[] = $r->$FSN;
}
}
$relatedStreams = Streams::fetch(
$asUserId, $publisherId, $names, $fields, $fetchOptions
);
foreach ($relatedStreams as $name => $s) {
if (!$s) continue;
$weight = isset($relations[$name]->weight)
? $relations[$name]->weight
: null;
$s->set('weight', $weight);
}
if (!empty($options['streamsOnly'])) {
return $relatedStreams;
}
return array(
$relations,
$relatedStreams,
$returnMultiple ? $streams : $stream
);
}
/**
* Updates the weight on a relation
* @param {string} $asUserId
* The id of the user on whose behalf the app will be updating the relation
* @param {string} $toPublisherId
* The publisher of the stream on the 'to' end of the relation
* @param {string} $toStreamName
* The name of the stream on the 'to' end of the relation
* @param {string} $type
* The type of the relation
* @param {string} $fromPublisherId
* The publisher of the stream on the 'from' end of the relation
* @param {string} $fromStreamName
* The name of the stream on the 'from' end of the relation
* @param {double} $weight
* The new weight
* @param {double} [$adjustWeights=1]
* The amount to move the other weights by, to make room for this one.
* This should be a positive number - the direction of the shift is determined automatically.
* Or, set to 0 to prevent moving the other weights.
* @param {array} $options=array()
* @param {boolean} [$options.skipAccess=false] If true, skips the access checks and just unrelates the stream from the category
* @param {string|array} [$options.extra] Any info to save in the "extra" field.
* @return {array|boolean}
* Returns false if the operation was canceled by a hook
* Otherwise returns array with key "to" and value of type Streams_Message
*/
static function updateRelation(
$asUserId,
$toPublisherId,
$toStreamName,
$type,
$fromPublisherId,
$fromStreamName,
$weight,
$adjustWeights = 1,
$options = array())
{
self::getRelations(
$asUserId,
$toPublisherId,
$toStreamName,
$type,
$fromPublisherId,
$fromStreamName,
$relatedTo,
$relatedFrom,
$categories,
$streams,
$arrayField,
$options);
$relatedTo = reset($relatedTo);
$relatedFrom = reset($relatedFrom);
$category = reset($categories);
$stream = reset($streams);
if (!$relatedTo) {
throw new Q_Exception_MissingRow(
array('table' => 'relatedTo', 'criteria' => 'with those fields'),
array('publisherId', 'name', 'type', 'toPublisherId', 'to_name')
);
}
// if (!$relatedFrom->retrieve()) {
// throw new Q_Exception_MissingRow(
// array('table' => 'relatedFrom', 'criteria' => 'those fields'),
// array('publisherId', 'name', 'type', 'fromPublisherId', 'from_name')
// );
// }
if (empty($options['skipAccess'])) {
if (!$category->testWriteLevel('relations')) {
throw new Users_Exception_NotAuthorized();
}
}
if (!empty($options['extra'])) {
$extra = $options['extra'];
$relatedTo->extra = is_string($extra) ? $extra : Q::json_encode($extra);
}
/**
* @event Streams/updateRelation/$streamType {before}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
* @param {double} weight
* @param {double} previousWeight
*/
$previousWeight = $relatedTo->weight;
if (empty($adjustWeights)) {
$adjustWeights = 0;
}
if (!is_numeric($adjustWeights) or $adjustWeights < 0) {
throw new Q_Exception_WrongType(array(
'fields' => 'adjustWeights',
'type' => 'a non-negative number'
));
}
$adjustWeightsBy = $weight < $previousWeight ? $adjustWeights : -$adjustWeights;
if (Q::event(
"Streams/updateRelation/{$stream->type}",
compact(
'relatedTo', 'relatedFrom', 'type', 'weight',
'previousWeight', 'adjustWeightsBy', 'asUserId', 'extra'
),
'before') === false
) {
return false;
}
if ($adjustWeights
and $weight !== $previousWeight) {
$criteria = array(
'toPublisherId' => $toPublisherId,
'toStreamName' => $toStreamName,
'type' => $type,
'weight' => $weight < $previousWeight
? new Db_Range($weight, true, false, $previousWeight)
: new Db_Range($previousWeight, false, true, $weight)
);
Streams_RelatedTo::update()->set(array(
'weight' => new Db_Expression("weight + " . $adjustWeightsBy)
))->where($criteria)->execute();
}
$relatedTo->weight = $weight;
$relatedTo->save();
// Send Streams/updatedRelateTo message to the category stream
// node server will be notified by Streams_Message::post
$message = Streams_Message::post($asUserId, $toPublisherId, $toStreamName, array(
'type' => 'Streams/updatedRelateTo',
'instructions' => compact(
'fromPublisherId', 'fromStreamName', 'type', 'weight', 'previousWeight', 'adjustWeightsBy', 'asUserId'
)
), true);
/**
* @event Streams/updateRelation/$categoryType {after}
* @param {string} relatedTo
* @param {string} relatedFrom
* @param {string} asUserId
* @param {double} weight
* @param {double} previousWeight
*/
Q::event(
"Streams/updateRelation/{$category->type}",
compact(
'relatedTo', 'relatedFrom', 'type', 'weight',
'previousWeight', 'adjustWeightsBy', 'asUserId', 'extra'
),
'after'
);
return $message;
}
/**
* If the user is not participating in the stream yet,
* inserts a participant record and posts a "Streams/join" or "Streams/visit" type message
* to the stream, depending on whether the user is already participating in the stream.
* Otherwise updates the participant record's timestamp and other things.
* Also relates every stream joined to streams named under the config field
* "Streams"/"types"/$streamType/"participating"
* @method join
* @static
* @param {string} $asUserId The id of the user that is joining. Pass null here to use the logged-in user's id.
* @param {string} $publisherId The id of the user publishing all the streams
* @param {array} $streams An array of Streams_Stream objects or stream names
* @param {array} [$options=array()] An associative array of options.
* @param {boolean} [$options.subscribed] If true, the user is set as subscribed
* @param {boolean} [$options.posted] If true, the user is set as subscribed
* @param {array} [$options.extra] Any extra information to tree-merge for the participants
* @param {boolean} [$options.noVisit] If user is already participating, don't post a "Streams/visited" message
* @param {boolean} [$options.skipAccess] If true, skip access check for whether user can join
* @param {boolean} [$options.skipRelationMessages=true] if true, skip posting messages on the stream about being
* related to the Streams/participating streams of the joining user with id asUserId's
* @return {array} Returns an array of (streamName => Streams_Participant) pairs.
* The newly inserted rows will have wasInserted() return true.
*/
static function join(
$asUserId,
$publisherId,
$streams,
$options = array())
{
$streams2 = self::_getStreams($asUserId, $publisherId, $streams);
$streamNames = array();
foreach ($streams2 as $s) {
$streamNames[] = $s->name;
}
if (empty($options['skipAccess'])) {
self::_accessExceptions($streams2, $streamNames, 'join');
}
$participants = Streams_Participant::select()
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNames,
'userId' => $asUserId
))->ignoreCache()->fetchDbRows(null, null, 'streamName');
$streamNamesMissing = array();
$streamNamesUpdate = array();
$messages = array();
$results = array();
$state = 'participating';
$updateCounts = array();
// this fields will modified in streams_participant table row
$changedFields = compact('state');
foreach ($streamNames as $sn) {
if (!isset($participants[$sn])) {
$updateCounts[''][] = $sn;
$streamNamesMissing[] = $sn;
continue;
}
$stream = $streams2[$sn];
$participant = &$participants[$sn];
if (isset($options['subscribed'])) {
$changedFields['subscribed'] = $participant->subscribed = 'yes';
}
if (isset($options['posted'])) {
$changedFields['posted'] = $participant->posted = 'yes';
}
if (isset($options['extra'])) {
$extra = Q::json_decode($participant->extra, true);
$tree = new Q_Tree($extra);
$tree->merge($options['extra']);
$participant->extra = Q::json_encode($tree->getAll(), true);
}
$streamNamesUpdate[] = $sn;
$type = ($participant->state === 'participating') ? 'visit' : 'join';
$prevState = $participant->state;
$participant->state = $state;
$updateCounts[$prevState][] = $sn;
if (empty($options['noVisit']) or $type !== 'visit') {
// Send a message to Node
Q_Utils::sendToNode(array(
"Q/method" => "Streams/Stream/$type",
"participant" => Q::json_encode($participant->toArray()),
"stream" => Q::json_encode($stream->toArray()),
"prevState" => $prevState
));
// Stream messages to post
$messages[$publisherId][$sn] = array(
'type' => "Streams/$type",
'instructions' => array(
'prevState' => $prevState,
'extra' => isset($participant->extra) ? $participant->extra : array()
)
);
}
$results[$sn] = $participant;
}
if ($streamNamesUpdate) {
Streams_Participant::update()
->set($changedFields)
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNamesUpdate,
'userId' => $asUserId
))->execute();
}
self::_updateCounts($publisherId, $updateCounts, $state);
if ($streamNamesMissing) {
$rows = array();
foreach ($streamNamesMissing as $sn) {
$stream = $streams2[$sn];
$extra = isset($options['extra'])
? $options['extra']
: '';
if (is_array($extra)) {
$extra = Q::json_encode($extra);
}
$results[$sn] = $rows[$sn] = new Streams_Participant(array(
'publisherId' => $publisherId,
'streamName' => $sn,
'userId' => $asUserId,
'streamType' => $stream->type,
'state' => 'participating',
'subscribed' => !empty($options['subscribed']) ? 'yes' : 'no',
'posted' => !empty($options['posted']) ? 'yes' : 'no',
'extra' => $extra
));
}
Streams_Participant::insertManyAndExecute($rows);
foreach ($streamNamesMissing as $sn) {
$stream = $streams2[$sn];
$participant = $rows[$sn];
// Send a message to Node
Q_Utils::sendToNode(array(
"Q/method" => "Streams/Stream/join",
"participant" => Q::json_encode($participant->fields),
"stream" => Q::json_encode($stream->toArray()),
"prevState" => null
));
// Stream messages to post
$messages[$publisherId][$sn] = array(
'type' => 'Streams/join',
'instructions' => array(
'prevState' => null,
'extra' => isset($participant->extra) ? $participant->extra : array()
)
);
}
}
Streams_Message::postMessages($asUserId, $messages, true);
// Relate to participating streams
$relateStreams = array();
foreach ($results as $sn => $p) {
$participatingNames = Streams_Stream::getConfigField(
$p->streamType, array('participating'), array()
);
foreach ($participatingNames as $pn) {
$relateStreams[$pn][$p->streamType][] = $sn;
}
}
foreach ($relateStreams as $pn => $streamTypes) {
foreach ($streamTypes as $streamType => $streamNames) {
if (!Streams::fetchOne($asUserId, $asUserId, $pn)) {
Streams::create($asUserId, $asUserId, 'Streams/participating', array('name' => $pn));
}
$extraArray = array();
foreach ($streamNames as $sn) {
$extraArray[$sn] = $results[$sn]->extra;
}
Streams::relate(
$asUserId, $asUserId, $pn,
$streamType, $publisherId, $streamNames,
array(
'skipMessageFrom' => Q::ifset($options, 'skipRelationMessages', true),
'skipAccess' => true,
'weight' => time(),
'extra' => $extraArray
)
);
}
}
return $results;
}
/**
* If the user is participating in (some of) the streams, sets state of participant row
* as "left" and posts a "Streams/leave" type message to the streams.
* Also unrelates every stream left to streams named under the config field
* "Streams"/"types"/$streamType/"participating"
* @method leave
* @static
* @param {string} $asUserId The id of the user that is joining. Pass null here to use the logged-in user's id.
* @param {string} $publisherId The id of the user publishing all the streams
* @param {array} $streams An array of Streams_Stream objects or stream names
* @param {array} [$options=array()] An associative array of options.
* @param {array} [$options.extra] Any extra information to tree-merge for the participants
* @param {boolean} [$options.skipAccess] If true, skip access check for whether user can join
* @param {boolean} [$options.skipRelationMessages=true] if true, skip posting messages on the stream about being unrelated to the joining asUserId's Streams/participating streams.
* @return {array} Returns an array of (streamName => Streams_Participant) pairs
*/
static function leave(
$asUserId,
$publisherId,
$streams,
$options = array())
{
$streams2 = self::_getStreams($asUserId, $publisherId, $streams);
$streamNames = array();
foreach ($streams2 as $s) {
$streamNames[] = $s->name;
}
if (empty($options['skipAccess'])) {
self::_accessExceptions($streams2, $streamNames, 'join');
}
$participants = Streams_Participant::select()
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNames,
'userId' => $asUserId
))->ignoreCache()->fetchDbRows(null, null, 'streamName');
$streamNamesUpdate = array();
$streamNamesMissing = array();
$updateCounts = array();
$state = 'left';
foreach ($streamNames as $sn) {
if (!isset($participants[$sn])) {
$updateCounts[''][] = $sn;
$streamNamesMissing[] = $sn;
continue;
}
$p = &$participants[$sn];
if ($p->state === $state) {
continue;
}
if (!isset($participants[$sn])) {
$streamNamesMissing[] = $sn;
continue;
}
if (isset($options['extra'])) {
$extra = Q::json_decode($p->extra, true);
$tree = new Q_Tree($extra);
$tree->merge($options['extra']);
$p->extra = Q::json_encode($tree->getAll(), true);
}
$streamNamesUpdate[] = $sn;
$updateCounts[$p->state][] = $sn;
$p->set('prevState', $p->state);
$p->state = $state;
}
if ($streamNamesUpdate) {
Streams_Participant::update()
->set(compact('state'))
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNamesUpdate,
'userId' => $asUserId
))->execute();
}
self::_updateCounts($publisherId, $updateCounts, $state);
if ($streamNamesMissing) {
$rows = array();
foreach ($streamNamesMissing as $sn) {
$stream = $streams2[$sn];
$participants[$sn] = $rows[$sn] = new Streams_Participant(array(
'publisherId' => $publisherId,
'streamName' => $sn,
'userId' => $asUserId,
'streamType' => $stream->type,
'state' => 'participating',
'subscribed' => !empty($options['subscribed']) ? 'yes' : 'no',
'posted' => !empty($options['posted']) ? 'yes' : 'no',
'extra' => !empty($options['extra']) ? $options['extra'] : ''
));
}
Streams_Participant::insertManyAndExecute($rows);
}
foreach ($streamNames as $sn) {
$stream = $streams2[$sn];
$participant = Q::ifset($participants, $sn, null);
$prevState = $participant ? $participant->get('prevState', null) : null;
// Send a message to Node
Q_Utils::sendToNode(array(
"Q/method" => "Streams/Stream/leave",
"participant" => Q::json_encode($participant->toArray()),
"stream" => Q::json_encode($stream->toArray()),
"prevState" => $prevState
));
// Stream messages to post
$messages[$publisherId][$sn] = array(
'type' => 'Streams/leave',
'instructions' => array(
'prevState' => $prevState,
'extra' => isset($participant->extra) ? $participant->extra : array()
)
);
}
Streams_Message::postMessages($asUserId, $messages, true);
// Unrelate to participating streams
$unrelateStreams = array();
foreach ($participants as $sn => $p) {
$participatingNames = Streams_Stream::getConfigField(
$p->streamType, array('participating'), array()
);
foreach ($participatingNames as $pn) {
$unrelateStreams[$pn][$p->streamType][] = $sn;
}
}
foreach ($unrelateStreams as $pn => $streamTypes) {
foreach ($streamTypes as $streamType => $streamNames) {
if (!Streams::fetchOne($asUserId, $asUserId, $pn)) {
Streams::create($asUserId, $asUserId, null, array('name' => $pn));
}
Streams::unrelate(
$asUserId, $asUserId, $pn,
$streamType, $publisherId, $streamNames,
array(
'skipMessageFrom' => Q::ifset($options, 'skipRelationMessages', true),
'skipAccess' => true,
'weight' => time()
)
);
}
}
return $participants;
}
/**
* Subscribe to one or more streams, to start receiving notifications.
* Posts "Streams/subscribe" message to the streams.
* Also posts "Streams/subscribed" messages to user's "Streams/participating" stream.
* If options are not given check the subscription templates:
* 1. generic publisher id and generic user
* 2. exact publisher id and generic user
* 3. generic publisher id and exact user
* default is to subscribe to ALL messages.
* If options are supplied - skip templates and use options.
* Using subscribe if subscription is already active will modify existing
* subscription - change type(s) or modify notifications
* @method subscribe
* @static
* @param {string} $asUserId The id of the user that is joining. Pass null here to use the logged-in user's id.
* @param {string} $publisherId The id of the user publishing all the streams
* @param {array} $streams An array of Streams_Stream objects or stream names
* @param {array} [$options=array()] Options for the subscribe() and join() methods
* @param {array} [$options.filter] optional array with two keys
* @param {array} [$options.filter.types] array of message types, if this is empty then subscribes to all types
* @param {array} [$options.filter.notifications=0] limit number of notifications, 0 means no limit
* @param {datetime} [$options.untilTime=null] time limit, if any for subscription
* @param {array} [$options.rule=array()] optionally override the rule for new subscriptions
* @param {array} [$options.rule.deliver=array('to'=>'default')] under "to" key,
* named the field under Streams/rules/deliver config, which will contain the names of destinations,
* which can include "email", "mobile", "email+pending", "mobile+pending"
* @param {datetime} [$options.rule.readyTime] time from which user is ready to receive notifications again
* @param {array} [$options.rule.filter] optionally set a filter for the rules to add
* @param {boolean} [$options.skipRules=false] if true, do not attempt to create rules for new subscriptions
* @param {boolean} [$options.skipAccess=false] if true, skip access check for whether user can join and subscribe
* @param {boolean} [$options.skipMessage=false] if true, skip posting the "Streams/subscribe" message to the stream
* @return {array} An array of Streams_Participant rows from the database.
*/
static function subscribe(
$asUserId,
$publisherId,
$streams,
$options = array())
{
$streams2 = self::_getStreams($asUserId, $publisherId, $streams);
$streamNames = array();
foreach ($streams2 as $s) {
$streamNames[] = $s->name;
}
if (empty($options['skipAccess'])) {
self::_accessExceptions($streams2, $streamNames, 'join');
}
$o = array_merge($options, array(
'subscribed' => true,
'noVisit' => true
));
$participants = Streams::join($asUserId, $publisherId, $streams2, $o);
$shouldUpdate = false;
if (isset($options['filter'])) {
$filter = Q::json_encode($options['filter']);
$shouldUpdate = true;
}
$db = Streams_Subscription::db();
if (isset($options['untilTime'])) {
$untilTime = $db->toDateTime($options['untilTime']);
$shouldUpdate = true;
}
$subscriptions = array();
$rows = Streams_Subscription::select()
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNames,
'ofUserId' => $asUserId
))->fetchAll(PDO::FETCH_ASSOC);
foreach ($rows as $row) {
$sn = $row['streamName'];
$subscriptions[$sn] = $row;
}
$messages = array();
$streamNamesMissing = array();
$streamNamesUpdate = array();
foreach ($streamNames as $sn) {
$messages[$publisherId][$sn] = array('type' => 'Streams/subscribe');
if (empty($subscriptions[$sn])) {
$streamNamesMissing[] = $sn;
continue;
}
if ($shouldUpdate) {
$streamNamesUpdate[] = $sn;
}
}
if ($streamNamesUpdate) {
Streams_Subscription::update()
->set(compact('filter', 'untilTime'))
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNamesUpdate,
'ofUserId' => $asUserId
))->execute();
}
$rules = array();
if ($streamNamesMissing) {
$types = array();
foreach ($streamNamesMissing as $sn) {
$stream = $streams2[$sn];
$types[$stream->type][] = $sn;
}
$subscriptionRows = array();
$ruleRows = array();
foreach ($types as $type => $sns) {
// insert subscriptions
if (!isset($filter) or !isset($untilTime)) {
$templates = Streams_Subscription::select()
->where(array(
'publisherId' => array('', $publisherId),
'streamName' => $type.'/',
'ofUserId' => array('', $asUserId)
))->fetchAll(PDO::FETCH_ASSOC);
$template = null;
foreach ($templates as $t) {
if (!$template
or ($template['publisherId'] == '' and $t['publisherId'] !== '')
or ($template['userId'] == '' and $t['userId'] !== '')) {
$template = $t;
}
}
}
if (!isset($filter)) {
$filter = Q::json_encode($template
? Q::json_decode($template['filter'])
: Streams_Stream::getConfigField($type, array(
'subscriptions', 'filter'
), array(
"types" => array(
"^(?!(Users/)|(Streams/)).*/",
"Streams/relatedTo",
"Streams/announcement",
"Streams/chat/message"
),
"notifications" => 0
))
);
}
if (!isset($untilTime)) {
$untilTime = ($template and $template['duration'] > 0)
? new Db_Expression("CURRENT_TIMESTAMP + INTERVAL $template[duration] SECOND")
: null;
}
foreach ($sns as $sn) {
$subscriptions[$sn] = $subscriptionRows[] = new Streams_Subscription(array(
'publisherId' => $publisherId,
'streamName' => $sn,
'ofUserId' => $asUserId,
'untilTime' => $untilTime,
'filter' => $filter
));
}
if (!empty($options['skipRules'])) {
continue;
}
// insert up to one rule per subscription
$rule = null;
if (isset($options['rule'])) {
$rule = $options['rule'];
if (isset($rule['readyTime'])) {
$rule['readyTime'] = $db->toDateTime($rule['readyTime']);
}
if (isset($rule['filter']) and is_array($rule['filter'])) {
$rule['filter'] = Q::json_encode($rule['filter']);
}
if (isset($rule['deliver']) and is_array($rule['deliver'])) {
$rule['deliver'] = Q::json_encode($rule['deliver']);
}
}
if (!isset($rule)) {
$templates = Streams_SubscriptionRule::select()
->where(array(
'ofUserId' => array('', $asUserId),
'publisherId' => array('', $publisherId),
'streamName' => $type.'/',
'ordinal' => 1
))->fetchAll(PDO::FETCH_ASSOC);
foreach ($templates as $t) {
if (!$rule
or ($rule['userId'] == '' and $t['userId'] !== '')
or ($rule['publisherId'] == '' and $t['publisherId'] !== '')) {
$rule = $t;
}
}
}
if (!isset($rule)) {
$rule = array(
'deliver' => '{"to": "default"}',
'filter' => '{"types": [], "labels": []}'
);
}
if ($rule) {
$rule['ofUserId'] = $asUserId;
$rule['publisherId'] = $publisherId;
if (empty($rule['readyTime'])) {
$rule['readyTime'] = new Db_Expression("CURRENT_TIMESTAMP");
}
foreach ($sns as $sn) {
$row = $rule;
$row['streamName'] = $sn;
$row['ordinal'] = 1;
$row['filter'] = '';
$rules[$sn] = $ruleRows[] = $row;
$messages[$publisherId][$sn]['instructions'] = Q::json_encode(array(
'rule' => $row
));
}
}
}
Streams_Subscription::insertManyAndExecute($subscriptionRows);
Streams_SubscriptionRule::insertManyAndExecute($ruleRows);
}
$streams5 = Q::take($streams, $streamNames);
Q_Utils::sendToNode(array(
"Q/method" => "Streams/Stream/subscribe",
"subscriptions" => Q::json_encode($subscriptions),
"streams" => Q::json_encode($streams5),
"rules" => Q::json_encode($rules)
));
if (empty($options['skipMessage'])) {
Streams_Message::postMessages($asUserId, $messages, true);
}
return $participants;
}
/**
* Unsubscribe from one or more streams, to stop receiving notifications.
* Pooststs "Streams/unsubscribe" message to the streams.
* Also posts "Streams/unsubscribed" messages to user's "Streams/participating" stream.
* Does not change the actual subscription, but only the participant row.
* (When subscribing again, the existing subscription will be used.)
* @method unsubscribe
* @static
* @param {string} $asUserId The id of the user that is joining. Pass null here to use the logged-in user's id.
* @param {string} $publisherId The id of the user publishing all the streams
* @param {array} $streams An array of Streams_Stream objects or stream names
* @param {array} [$options=array()]
* @param {boolean} [$options.leave=false] set to true to also leave the streams
* @param {boolean} [$options.skipAccess=false] if true, skip access check for whether user can join and subscribe
* @param {boolean} [$options.skipMessage=false] if true, skip posting the "Streams/unsubscribe" message to the stream
* @return {array} Returns an array of Streams_Participant rows, if any were in the database.
*/
static function unsubscribe(
$asUserId,
$publisherId,
$streams,
$options = array())
{
$streams2 = self::_getStreams($asUserId, $publisherId, $streams);
$streamNames = array();
foreach ($streams2 as $s) {
$streamNames[] = $s->name;
}
if (empty($options['skipAccess'])) {
self::_accessExceptions($streams2, $streamNames, 'join');
}
$skipAccess = Q::ifset($options, 'skipAccess', false);
if (empty($options['leave'])) {
$criteria = array(
'publisherId' => $publisherId,
'streamName' => $streamNames,
'userId' => $asUserId
);
Streams_Participant::update()
->set(array('subscribed' => 'no'))
->where($criteria)->execute();
$participants = Streams_Participant::select()
->where($criteria)
->fetchDbRows();
} else {
$participants = Streams::leave(
$asUserId, $publisherId, $streams2, compact('skipAccess')
);
}
$messages = array();
foreach ($streamNames as $sn) {
$stream = $streams2[$sn];
if ($participant = Q::ifset($participants, $sn, null)) {
if ($participant instanceof Streams_Participant) {
$participant = $participant->toArray();
}
}
// Send a message to Node
Q_Utils::sendToNode(array(
"Q/method" => "Streams/Stream/unsubscribe",
"participant" => Q::json_encode($participant),
"stream" => Q::json_encode($stream->toArray())
));
// Stream messages to post
$messages[$publisherId][$sn] = array('type' => 'Streams/unsubscribe');
}
if (empty($options['skipMessage'])) {
Streams_Message::postMessages($asUserId, $messages, true);
}
return $participants;
}
private static function _getStreams(&$asUserId, $publisherId, $streams)
{
if (!isset($asUserId)) {
$asUserId = Users::loggedInUser(true)->id;
} else if ($asUserId instanceof Users_User) {
$asUserId = $asUserId->id;
}
Users::fetch($asUserId, true);
$names = array();
$streams2 = array();
foreach ($streams as $s) {
if (is_string($s)) {
$names[] = $s;
} else if ($s instanceof Streams_Stream) {
$streams2[$s->name] = $s;
} else if (isset($s)) {
throw new Q_Exception_WrongType(array(
'field' => 'stream',
'type' => 'Streams_Stream or string'
));
}
}
$rows = Streams::fetch($asUserId, $publisherId, $names, array('refetch' => true));
$result = array_merge($streams2, $rows);
foreach ($result as $k => $v) {
if (!isset($v)) {
unset($result[$k]);
}
}
return $result;
}
/**
* Gets the streams related by relation of type "Streams/participating"
* to the given category stream. Typically, these relations are added and removed
* when the user joins or leaves the related streams, which is done automatically
* by the Streams plugin with the streams named in under the config field
* "Streams"/"types"/$streamType/"participating", which is an array of stream names.
* @method participating
* @static
* @param {string|array|Db_Range} [$options.type]
* Filter the type(s) of the streams to return, that the user is participating in.
* @param {array} [$options=array()] options you can pass to Streams::relate() method
* @param {string} [$options.publisherId=Users::loggedInUser(true)->id] the publisher of the category stream
* @param {string} [$options.categoryName='Streams/participating'] the name of the category stream
* @param {string} [$options.asUserId=Users::loggedInUser(true)->id] the user to fetch as
* @return {array}
* Returns array($relations, $relatedStreams, $stream).
* However, if $streamName wasn't a string or ended in "/"
* then the third parameter is an array of streams.
* Also, if $options['streamsOnly'] or $options['relationsOnly'] is set,
* then returns only $relatedStreams or $relations.
*/
static function participating(
$type = null,
$options = array())
{
if (isset($type)) {
$options['type'] = $type;
}
$streamName = isset($options['categoryName'])
? $options['categoryName']
: 'Streams/participating';
$publisherId = isset($options['publisherId'])
? $options['publisherId']
: Users::loggedInUser(true)->id;
$asUserId = isset($options['asUserId'])
? $options['asUserId']
: $publisherId;
return Streams::related($asUserId, $publisherId, $streamName, true, $options);
}
private static function _accessExceptions($streams2, $streamNames, $writeLevel)
{
foreach ($streamNames as $sn) {
$stream = $streams2[$sn];
if (!$stream->testWriteLevel($writeLevel)) {
if (!$stream->testReadLevel('see')) {
throw new Streams_Exception_NoSuchStream();
}
throw new Users_Exception_NotAuthorized();
}
}
}
private static function _updateCounts($publisherId, $updateCounts, $state)
{
foreach ($updateCounts as $prevState => $names) {
if ($prevState === $state) {
continue;
}
$newStateField = $state.'Count';
$updates = array(
$newStateField => new Db_Expression("$newStateField + 1")
);
if ($prevState) {
$prevStateField = $prevState.'Count';
$updates[$prevStateField] = new Db_Expression("$prevStateField - 1");
}
Streams_Stream::update()
->set($updates)
->where(array(
'publisherId' => $publisherId,
'name' => $names
))->execute();
}
}
/**
* Invites a user (or a future user) to a stream .
* @method invite
* @static
* @param {string} $publisherId The id of the stream publisher
* @param {string} $streamName The name of the stream the user will be invited to
* @param {array} $who Array that can contain the following keys:
* @param {string|array} [$who.userId] user id or an array of user ids
* @param {string} [$who.platform] platform for which uids are passed
* @param {string|array} [$who.uid] platform uid or array of uids
* @param {string|array} [$who.label] label or an array of labels, or tab-delimited string
* @param {string|array} [$who.identifier] identifier such as an email or mobile number, or an array of identifiers, or tab-delimited string
* @param {integer} [$who.newFutureUsers] the number of new Users_User objects to create via Users::futureUser in order to invite them to this stream. This typically is used in conjunction with passing the "html" option to this function.
* @param {boolean} [$who.token=false] pass true here to save a Streams_Invite row
* with empty userId, which is used whenever someone shows up with the token
* and presents it via "Q.Streams.token" querystring parameter.
* See the Streams/before/Q_objects.php hook for more information.
* @param {array} [$options=array()]
* @param {string|array} [$options.addLabel] label or an array of ($label => array($title, $icon)) for adding publisher's contacts
* @param {string|array} [$options.addMyLabel] label or an array of ($label => array($title, $icon)) for adding asUserId's contacts
* @param {string|integer} [$options.readLevel] the read level to grant those who are invited
* @param {string|integer} [$options.writeLevel] the write level to grant those who are invited
* @param {string|integer} [$options.adminLevel] the admin level to grant those who are invited
* @param {array} [$options.permissions] array of additional permissions to grant
* @param {string} [$options.displayName] the display name to use to represent the inviting user
* @param {string} [$options.appUrl] Can be used to override the URL to which the invited user will be redirected and receive "Q.Streams.token" in the querystring.
* @param {array} [$options.html] an array of ($template, $batchName) such as ("MyApp/foo.handlebars", "foo") for generating html snippets which can then be viewed from and printed via the action Streams/invitations?batchName=$batchName&invitingUserId=$asUserId&limit=$limit&offset=$offset
* @param {string} [$options.asUserId=Users::loggedInUser(true)->id] Invite as this user id, defaults to logged-in user
* @param {boolean} [$options.skipAccess] whether to skip access checks when adding labels and contacts
* @see Users::addLink()
* @return {array} Returns array with keys
* "success", "userIds", "statuses", "identifierTypes", "alreadyParticipating".
* The userIds array contains userIds from "userId" first, then "identifiers", "uids", "label",
* then "newFutureUsers". The statuses is an array of the same size and in the same order.
* The identifierTypes array is in the same order as well.
* If the "token" option was set to true, the array also contains the "invite"
* key pointing to a Streams_Invite object that was saved in the database
* (whose userId field is empty because anyone with the token may accept this invite).
*/
static function invite($publisherId, $streamName, $who, $options = array())
{
if (isset($options['asUserId'])) {
$asUserId = $options['asUserId'];
$asUser = Users_User::fetch($asUserId);
} else {
$asUser = Users::loggedInUser(true);
$asUserId = $asUser->id;
}
// Fetch the stream as the logged-in user
$stream = Streams::fetchOne($asUserId, $publisherId, $streamName, true);
// Do we have enough admin rights to invite others to this stream?
if (!$stream->testAdminLevel('invite') || !$stream->testWriteLevel('join')) {
throw new Users_Exception_NotAuthorized();
}
if (isset($options['html'])) {
$html = $options['html'];
if (!is_array($html) or count($html) < 2) {
throw new Q_Exception_WrongType(array(
'field' => "options.html",
'type' => 'array of 2 strings'
));
}
list($template, $batchName) = $html;
// validate these paths
$filename = APP_VIEWS_DIR.DS.$template;
if (!Q::realPath($filename)) {
throw new Q_Exception_MissingFile(compact('filename'));
}
$ext = $pathinfo = pathinfo($template, PATHINFO_EXTENSION);
if ($ext !== 'handlebars') {
throw new Q_Exception_WrongValue(array(
'field' => 'options.html[0]',
'range' => 'a filename with extension .handlebars'
));
}
$path = Streams::invitationsPath($asUserId).DS.$batchName;
Q_Utils::canWriteToPath($path, true, true);
}
// merge identifiers if any
$raw_userIds = array();
$identifiers = array();
$identifierTypes = array();
$statuses = array();
// get user ids if any to array, throw if user not found
if (isset($who['userId'])) {
$userIds = $who['userId'];
if (!is_array($userIds)) {
$userIds = array($userIds);
}
$users = Users::fetch($userIds, true);
$raw_userIds = array_keys($users);
foreach ($users as $uid => $user) {
$identifierTypes[] = 'userId';
$statuses[] = $user->sessionCount ? 'verified' : 'future';
}
}
if (isset($who['identifier'])) {
$identifiers = $who['identifier'];
if (is_string($identifiers)) {
$identifiers = array_map('trim', explode("\t", $identifiers)) ;
}
$identifier_ids = Users_User::idsFromIdentifiers(
$identifiers, $statuses1, $identifierTypes1
);
$raw_userIds = array_merge($raw_userIds, $identifier_ids);
$statuses = array_merge($statuses, $statuses1);
$identifierTypes = array_merge($identifierTypes, $identifierTypes1);
}
if (!empty($who['platform']) and !empty($who['uid'])) {
// merge users from platform uids
$platform = $who['platform'];
$uids = $who['uid'];
if (is_string($uids)) {
$uids = array_map('trim', explode("\t", $uids)) ;
}
$statuses2 = array();
$raw_userIds = array_merge(
$raw_userIds,
Users_User::idsFromPlatformUids($platform, $uids, $statuses2)
);
$statuses = array_merge($statuses, $statuses2);
$identifiers = array_merge($identifiers, $uids);
$identifierTypes2 = array_fill(0, count($statuses2), $platform);
$identifierTypes = array_merge($identifierTypes, $identifierTypes2);
}
// merge labels if any
if (isset($who['label'])) {
$label = $who['label'];
if (is_string($label)) {
$label = array_map('trim', explode("\t", $label)) ;
}
$userIdsFromLabels = Users_User::labelsToIds($asUserId, $label);
$raw_userIds = array_merge($raw_userIds, $userIdsFromLabels);
foreach ($userIdsFromLabels as $userId) {
$statuses[] = 'verified'; // NOTE: this may occasionally be inaccurate
$identifierTypes[] = 'label';
}
}
if (!empty($who['newFutureUsers'])) {
$nfu = $who['newFutureUsers'];
for ($i=0; $i<$nfu; ++$i) {
$raw_userIds[] = Users::futureUser('none', null)->id;
$statuses[] = 'future';
$identifierTypes[] = 'newFutureUsers';
}
}
// ensure that each userId is included only once
// and remove already participating users
$userIds = array_unique($raw_userIds);
$alreadyParticipating = Streams_Participant::filter(
$userIds, $stream->publisherId, $stream->name, null
);
$userIds = array_diff($raw_userIds, $alreadyParticipating);
$appUrl = !empty($options['appUrl'])
? $options['appUrl']
: Q_Request::baseUrl().'/'.Q_Config::get(
"Streams", "types", $stream->type,
"invite", "url", "{{Streams}}/stream"
);
// now check and define levels for invited user
$readLevel = isset($options['readLevel']) ? $options['readLevel'] : null;
if (isset($readLevel)) {
$readLevel = Streams_Stream::numericReadLevel($readLevel);
if (!$stream->testReadLevel($readLevel)) {
// We can't assign greater read level to other people than we have ourselves!
throw new Users_Exception_NotAuthorized();
}
}
$writeLevel = isset($options['writeLevel']) ? $options['writeLevel'] : null;
if (isset($writeLevel)) {
$writeLevel = Streams_Stream::numericWriteLevel($writeLevel);
if (!$stream->testWriteLevel($writeLevel)) {
// We can't assign greater write level to other people than we have ourselves!
throw new Users_Exception_NotAuthorized();
}
}
$adminLevel = isset($options['adminLevel']) ? $options['adminLevel'] : null;
if (isset($adminLevel)) {
$adminLevel = Streams_Stream::numericAdminLevel($adminLevel);
if (!$stream->testAdminLevel($adminLevel+1)) {
// We can't assign an admin level greater, or equal, to our own!
// A stream's publisher can assign owners. Owners can assign admins.
// Admins can confer powers to invite others, to some people.
// Those people can confer the privilege to publish a message re this stream.
// But admins can't assign other admins, and even stream owners
// can't assign other owners.
throw new Users_Exception_NotAuthorized();
}
}
$permissions = isset($options['permissions']) ? $options['permissions'] : null;
if (isset($permissions)) {
foreach ($permissions as $permission) {
$byPermissions = $stream->get('permissions', array());
if (!in_array($permission, $byPermissions)) {
// We can't assign permissions we don't have ourselves
throw new Users_Exception_NotAuthorized();
}
}
}
// calculate expireTime
$duration = Q_Config::get("Streams", "types", $stream->type, "invite", "duration", false);
$expireTime = $duration ? strtotime("+$duration seconds") : null;
$asUserId2 = empty($options['skipAccess']) ? $asUserId : false;
if ($label = Q::ifset($options, 'addLabel', null)) {
if (is_string($label)) {
$label = explode("\t", $label);
}
Users_Label::addLabel($label, $publisherId, null, null, $asUserId2, true);
}
if ($myLabel = Q::ifset($options, 'addMyLabel', null)) {
if (is_string($myLabel)) {
$myLabel = explode("\t", $myLabel);
}
Users_Label::addLabel($myLabel, $asUserId, null, null, $asUserId2, true);
}
foreach ($raw_userIds as $userId) {
Users_Contact::addContact($asUserId, "Streams/invited", $userId, null, false, true);
Users_Contact::addContact($asUserId, "Streams/invited/{$stream->type}", $userId, null, false, true);
Users_Contact::addContact($userId, "Streams/invitedMe", $asUserId, null, false, true);
Users_Contact::addContact($userId, "Streams/invitedMe/{$stream->type}", $asUserId, null, false, true);
if ($label) {
$label2 = Q::isAssociative($label) ? array_keys($label) : $label;
Users_Contact::addContact($publisherId, $label2, $userId, null, $asUserId2, true);
}
if ($myLabel) {
$myLabel2 = Q::isAssociative($myLabel) ? array_keys($myLabel) : $myLabel;
Users_Contact::addContact($asUserId, $myLabel2, $userId, null, $asUserId2, true);
}
}
// let node handle the rest, and get the result
$displayName = isset($options['displayName'])
? $options['displayName']
: Streams::displayName($asUser);
$params = array(
"Q/method" => "Streams/Stream/invite",
"invitingUserId" => $asUserId,
"username" => $asUser->username,
"userIds" => Q::json_encode($userIds),
"stream" => Q::json_encode($stream->toArray()),
"appUrl" => $appUrl,
"label" => $label,
"myLabel" => $myLabel,
"readLevel" => $readLevel,
"writeLevel" => $writeLevel,
"adminLevel" => $adminLevel,
"permissions" => $permissions,
"displayName" => $displayName,
"expireTime" => $expireTime
);
if (!empty($template)) {
$params['template'] = $template;
$params['batchName'] = $batchName;
}
$result = Q_Utils::queryInternal('Q/node', $params);
$return = array(
'success' => $result,
'userIds' => $raw_userIds,
'statuses' => $statuses,
'identifiers' => $identifiers,
'identifierTypes' => $identifierTypes,
'alreadyParticipating' => $alreadyParticipating
);
if (!empty($who['token'])) {
$invite = new Streams_Invite();
$invite->userId = '';
$invite->publisherId = $publisherId;
$invite->streamName = $streamName;
$invite->invitingUserId = $asUserId;
$invite->displayName = $displayName;
$invite->appUrl = $appUrl;
$invite->readLevel = $readLevel;
$invite->writeLevel = $writeLevel;
$invite->adminLevel = $adminLevel;
$invite->state = 'pending';
$invite->save();
$return['invite'] = $invite;
}
$instructions = array_merge($who, $options, compact(
'displayName', 'appUrl', 'readLevel', 'writeLevel', 'adminLevel', 'permissions'
));
Streams_Message::post($asUserId, $publisherId, $streamName, array(
'type' => 'Streams/invite',
'instructions' => $instructions
), true);
/**
* @event Streams/invite {after}
* @param {string} success
* @param {array} userIds
* @param {array} identifiers
* @param {array} identifierTypes
* @param {array} alreadyParticipating
* @param {Streams_Invite} invite
*/
Q::event('Streams/invite', $return, 'after');
return $return;
}
/**
* Each user is allowed at most one request per stream.
* If a request already exists, then this function returns it, even if it has
* different parameters.
* Otherwise, it inserts a request for upgrading access to a stream,
* and schedules actions to be taken automatically if the request is granted.
* It also posts a "Streams/request" message to the stream, so that someone
* with the adminLevel >= invite and adequate readLevel, writeLevel and permissions
* can grant the request.
* @method request
* @static
* @param {string} $publisherId The id of the stream publisher
* @param {string} $streamName The name of the stream the request is about
* @param {array} [$options=array()] All of these are optional
* @param {string} [$options.userId=Users::loggedInUser()->id]
* The id of the user requesting the upgraded access.
* @param {string|integer} [$options.readLevel] upgrade to read level being requested
* @param {string|integer} [$options.writeLevel] upgrade to write level being requested
* @param {string|integer} [$options.adminLevel] upgrade to admin level being requested
* @param {array} [$options.permissions] array of additional permissions to request
* @param {array} [$options.actions] array of actions to take automatically
* right after a request is granted, e.g. "Streams/join" or "Streams/subscribe".
* These can be interpreted in the "after" hook for "Streams/request" event.
* @return {Streams_Request} The request row that has been inserted into the database
*/
static function request($publisherId, $streamName, $options = array())
{
// get the user id
if (isset($options['userId'])) {
$userId = $options['userId'];
$user = Users_User::fetch($userId);
} else {
$user = Users::loggedInUser(true);
$userId = $user->id;
}
// see if such a request already exists
$request = new Streams_Request();
$request->publisherId = $publisherId;
$request->streamName = $streamName;
$request->userId = $userId;
if ($request->retrieve()) {
return $request;
}
// fetch the stream as the logged-in user
$stream = Streams::fetchOne($userId, $publisherId, $streamName, true);
// process any requested levels
$readLevel = isset($options['readLevel']) ? $options['readLevel'] : null;
if (isset($readLevel)) {
$request->readLevel = Streams_Stream::numericReadLevel($readLevel);
}
$writeLevel = isset($options['writeLevel']) ? $options['writeLevel'] : null;
if (isset($writeLevel)) {
$request->writeLevel = Streams_Stream::numericWriteLevel($writeLevel);
}
$adminLevel = isset($options['adminLevel']) ? $options['adminLevel'] : null;
if (isset($adminLevel)) {
$request->adminLevel = Streams_Stream::numericAdminLevel($adminLevel);
}
$permissions = isset($options['permissions']) ? $options['permissions'] : null;
if (!is_array($permissions)) {
throw new Q_Exception_WrongValue(array(
'field' => 'permissions',
'range' => 'array'
));
}
$request->permissions = Q::json_encode($permissions);
// calculate expireTime
$duration = Q_Config::get("Streams", "types", $stream->type, "request", "duration", false);
$request->expireTime = $duration ? strtotime("+$duration seconds") : null;
// fill out the rest of the fields
$request->state = 'pending';
/**
* @event Streams/request {before}
* @param {Streams_Request} request
*/
Q::event('Streams/request', compact('userId'), 'before');
$request->save();
// Send Streams/request message to the stream
Streams_Message::post($userId, $publisherId, $streamName, array(
'type' => 'Streams/request',
'instructions' => Q::take($request->fields, array(
'readLevel', 'writeLevel', 'adminLevel', 'expireTime', 'state'
))
), true);
/**
* @event Streams/request {after}
* @param {Streams_Request} request
*/
Q::event('Streams/request', compact('request'), 'after');
return $request;
}
/**
* Closes a stream, which prevents anyone from posting messages to it
* unless they have WRITE_LEVEL >= "close", as well as attempting to remove
* all relations to other streams. A "cron job" can later go and delete
* closed streams. The reason you should avoid deleting streams right away
* is that other subscribers may still want to receive the last messages
* posted to the stream.
* @method close
* @param {string} $asUserId The id of the user who would be closing the stream
* @param {string} $publisherId The id of the user publishing the stream
* @param {string} $streamName The name of the stream
* @param {array} [$options=array()] Can include "skipAccess"
* @static
*/
static function close($asUserId, $publisherId, $streamName, $options = array())
{
if (!isset($asUserId)) {
$asUserId = Users::loggedInUser();
if (!$asUserId) $asUserId = "";
}
if ($asUserId instanceof Users_User) {
$asUserId = $asUserId->id;
}
$stream = new Streams_Stream();
$stream->publisherId = $publisherId;
$stream->name = $streamName;
if (!$stream->retrieve()) {
throw new Q_Exception_MissingRow(array(
'table' => 'stream',
'criteria' => "{publisherId: '$publisherId', name: '$streamName'}"
));
}
// Authorization check
if (empty($options['skipAccess'])) {
if ($asUserId !== $publisherId) {
$stream->calculateAccess($asUserId);
if (!$stream->testWriteLevel('close')) {
throw new Users_Exception_NotAuthorized();
}
}
}
// Clean up relations from other streams to this category
list($relations, $related) = Streams::related(
$asUserId,
$stream->publisherId,
$stream->name,
true
);
foreach ($relations as $r) {
try {
Streams::unrelate(
$asUserId,
$r->fromPublisherId,
$r->fromStreamName,
$r->type,
$stream->publisherId,
$stream->name
);
} catch (Exception $e) {}
}
// Clean up relations from this stream to categories
list($relations, $related) = Streams::related(
$asUserId,
$stream->publisherId,
$stream->name,
false
);
foreach ($relations as $r) {
try {
Streams::unrelate(
$asUserId,
$r->toPublisherId,
$r->toStreamName,
$r->type,
$stream->publisherId,
$stream->name,
array(
'skipAccess' => true
)
);
} catch (Exception $e) {}
}
$result = false;
try {
$db = $stream->db();
$stream->closedTime = $closedTime =
$db->toDateTime($db->getCurrentTimestamp());
if ($stream->save()) {
$stream->post($asUserId, array(
'type' => 'Streams/closed',
'content' => '',
'instructions' => compact('closedTime')
), true);
$result = true;
}
} catch (Exception $e) {
throw $e;
}
return $result;
}
/**
* Get first and last name out of full name
* @method splitFullName
* @static
* @param {string} $fullName The string representing full name
* @return {array} array containing 'first' and 'last' properties
*/
static function splitFullName ($fullName) {
$capitalize = Q_Config::get('Streams', 'inputs', 'fullName', 'capitalize', true);
$last = null;
if (strpos($fullName, ',') !== false) {
list($last, $first) = explode(',', $fullName);
} else if (strpos($fullName, ' ') !== false) {
$parts = explode(' ', $fullName);
if ($capitalize) {
foreach ($parts as $k => $v) {
$parts[$k] = ucfirst($v);
}
}
$last = join(' ', array_slice($parts, 1));
$first = $parts[0];
} else {
$first = $fullName;
}
$first = trim($first);
$last = trim($last);
return compact('first', 'last');
}
/**
* Get a stream of type "Streams/experience" published by the community
* @method experience
* @static
* @param {string} [$experienceId='main']
* @return {Streams_Stream}
*/
static function experience($experienceId = 'main')
{
$communityId = Users::communityId();
return Streams::fetchOne(null, $communityId, "Streams/experience/$experienceId", true);
}
/**
* Registers a user. Can be hooked to 'Users/register' before event
* so it can override standard functionality.
* Method ensures user registration based on full name and also handles registration of
* invited user
* @method register
* @static
* @param {array|string} $fullName A string, or an array with keys
* @param {string} $fullName.first The first name
* @param {string} $fullName.last The last name
* @param {string|array} $identifier Can be an email address or mobile number. Or it could be an array of $type => $info
* @param {string} [$identifier.identifier] an email address or phone number
* @param {array} [$identifier.device] an array with keys
* "deviceId", "platform", "appId", "version", "formFactor"
* to store in the Users_Device table for sending notifications
* @param {array} [$identifier.app] an array with "platform" key, and optional "appId"
* @param {array|string|true} [$icon=true] By default, the user icon is "default".
* But you can pass here an array of filename => url pairs, or a gravatar url to
* download the various sizes from gravatar. Finally, you can pass true to
* generate an icon instead of using the default icon.
* If $identifier['app']['platform'] is specified, and $icon==true, then
* an attempt will be made to download the icon from the user's account on the platform.
* @param {array} [$options=array()] An array of options that could include:
* @param {string} [$options.activation] The key under "Users"/"transactional" config to use for sending an activation message. Set to false to skip sending the activation message for some reason.
* @return {Users_User}
* @throws {Q_Exception_WrongType} If identifier is not a valid email address or mobile number
* @throws {Q_Exception} If identifier was already verified for someone else
* @throws {Users_Exception_AlreadyVerified} If user was already verified
* @throws {Users_Exception_UsernameExists} If username exists
*/
static function register(
$fullName,
$identifier,
$icon = array(),
$options = array())
{
/**
* @event Streams/register {before}
* @param {string} username
* @param {string|array} identifier
* @param {string} icon
* @return {Users_User}
*/
$return = Q::event('Streams/register', compact(
'name', 'fullName', 'identifier', 'icon', 'options'), 'before'
);
if (isset($return)) {
return $return;
}
// this will be used in Streams_after_Users_User_saveExecute
if (is_string($fullName)) {
$fullName = Streams::splitFullName($fullName);
}
Streams::$cache['fullName'] = $fullName ? $fullName : array(
'first' => '',
'last' => ''
);
$user = Users::register("", $identifier, $icon, $options);
/**
* @event Streams/register {after}
* @param {string} username
* @param {string|array} identifier
* @param {string} icon
* @param {Users_User} 'user'
* @return {Users_User}
*/
Q::event('Streams/register', compact(
'register', 'identifier', 'icon', 'user', 'options'
), 'after');
return $user;
}
/**
* A convenience method to get the URL of the streams-related action
* @method actionUrl
* @static
* @param {string} $publisherId
* The name of the publisher
* @param {string} $name
* The name of the stream
* @param {string} $what
* Defaults to 'stream'. Can also be 'message', 'relation', etc.
* @return {string}
* The corresponding URL
*/
static function actionUrl($publisherId, $name, $what = 'stream')
{
switch ($what) {
case 'stream':
case 'message':
case 'relation':
$qs = http_build_query(compact('publisherId', 'name'));
return Q_Uri::url("Streams/$what?$qs");
}
return null;
}
/**
* Look up stream by types and title filter
* @method lookup
* @static
* @param {string} $publisherId
* The id of the publisher whose streams to look through
* @param {string|array} $types
* The possible stream type, or an array of types
* @param {string} $title
* A string to compare titles by using SQL's "LIKE" statement
*/
static function lookup($publisherId, $types, $title)
{
$fc = $title[0];
if ($fc === '%' and strlen($title) > 1
and Q_Config::get('Streams', 'lookup', 'requireTitleIndex', true)) {
throw new Q_Exception_WrongValue(array(
'field' => 'title',
'range' => "something that doesn't start with %"
));
}
$limit = Q_Config::get('Streams', 'lookup', 'limit', 10);
return Streams_Stream::select()->where(array(
'publisherId' => $publisherId,
'type' => $types,
'title LIKE ' => $title
))->limit($limit)->fetchDbRows();
}
/**
* Get a structured, sorted array with all the interests in a community
* @method interests
* @static
* @param {string} [$communityId=Users::communityId()] the id of the community
* @return {array} an array of $category => ($subcategory =>) $interest
*/
static function interests($communityId = null)
{
if (!isset($communityId)) {
$communityId = Users::communityId();
}
$tree = new Q_Tree();
$basename = Q_Text::basename();
$tree->load("files/Streams/interests/$communityId/$basename.json");
$interests = $tree->getAll();
foreach ($interests as $category => &$v1) {
foreach ($v1 as $k2 => &$v2) {
if (!Q::isAssociative($v2)) {
ksort($v1);
break;
}
ksort($v2);
}
}
return $interests;
}
static function getExtendClasses($type)
{
static $result = array();
if (isset($result[$type])) {
return $result[$type];
}
$extend = Q_Config::get('Streams', 'types', $type, 'extend', null);
if (is_string($extend)) {
$extend = array($extend);
}
$classes = array();
if ($extend) {
if (!Q::isAssociative($extend)) {
$temp = array();
foreach ($extend as $k) {
$temp[$k] = true;
}
$extend = $temp;
}
foreach ($extend as $k => $v) {
if (!class_exists($k, true)) {
throw new Q_Exception_MissingClass(array(
'className' => $k
));
}
if (!is_subclass_of($k, 'Db_Row')) {
throw new Q_Exception_BadValue(array(
'internal' => "Streams/types/$type/extend",
'problem' => "$k must extend Db_Row"
));
}
if ($v === true) {
$v = call_user_func(array('Base_'.$k, 'fieldNames'));
$v = array_diff($v, array('publisherId', 'streamName', 'name'));
} else if (Q::isAssociative($v)) {
$v = array_keys($v);
}
$classes[$k] = $v;
}
}
return $result[$type] = $classes;
}
static function getExtendFieldNames($type, $asOwner = true)
{
$classes = Streams::getExtendClasses($type);
$fieldNames = array('title', 'icon', 'content', 'attributes');
if ($asOwner) {
$fieldNames = array_merge($fieldNames, array(
'readLevel', 'writeLevel', 'adminLevel', 'inheritAccess', 'closedTime'
));
}
foreach ($classes as $k => $v) {
foreach ($v as $f) {
$fieldNames[] = $f;
}
}
return $fieldNames;
}
static function invitedUrl ($token) {
$baseUrl = Q_Config::get(array('Streams', 'invites', 'baseUrl'), "i");
return Q_Html::themedUrl("$baseUrl/$token");
}
static function invitationsPath($invitingUserId)
{
$app = Q::app();
$subpath = Q_Config::get(
'Streams', 'invites', 'subpath',
'{{app}}/uploads/Streams/invitations'
);
return APP_FILES_DIR
.DS.Q::interpolate($subpath, compact('app'))
.DS.Q_Utils::splitId($invitingUserId);
}
static function userStreamsTree()
{
$p = new Q_Tree();
$arr = Q_Config::get('Streams', 'userStreams', array());
$app = Q::app();
foreach ($arr as $k => $v) {
$PREFIX = ($k === $app ? 'APP' : strtoupper($k).'_PLUGIN');
$path = constant( $PREFIX . '_CONFIG_DIR' );
$p->load($path.DS.$v);
}
return $p;
}
protected static function afterFetchExtended($publisherId, $streams)
{
if (!$streams) return;
$classes = array();
$rows = array();
$streamNamesByType = array();
foreach ($streams as $stream) {
if (!$stream) continue;
$streamNamesByType[$stream->type][] = $stream->name;
}
$classes = array();
foreach ($streams as $stream) {
if (!$stream) continue;
$type = $stream->type;
if (!isset($classes[$type])) {
$classes[$type] = Streams::getExtendClasses($type);
foreach ($classes[$type] as $className => $fieldNames) {
$rows[$className] = call_user_func(array($className, 'select'))
->where(array(
'publisherId' => $publisherId,
'streamName' => $streamNamesByType[$type]
))->fetchDbRows(null, '', 'streamName');
}
}
}
foreach ($streams as $stream) {
if (!$stream) continue;
$streamName = $stream->name;
foreach ($classes[$stream->type] as $className => $fieldNames) {
if (empty($rows[$className][$streamName])) continue;
foreach ($fieldNames as $f) {
if (!isset($rows[$className][$streamName])) continue;
$stream->$f = $rows[$className][$streamName]->$f;
}
$stream->wasModified(false);
$row = $stream->rows[$className] = $rows[$className][$streamName];
$row->set('Streams_Stream', $stream);
$stream->set($className, $row);
}
}
}
private static function messageTotals($publisherId, $name, $options, $streams)
{
if (empty($options['withMessageTotals'])) {
return $streams;
}
$infoForTotals = array();
if (isset($options['withMessageTotals']['*'])) {
$trows = Streams_MessageTotal::select()->where(array(
'publisherId' => $publisherId,
'streamName' => $name,
'messageType' => $options['withMessageTotals']['*']
))->fetchDbRows();
unset($options['withMessageTotals']['*']);
} else {
$trows = array();
}
foreach ($options['withMessageTotals'] as $n => $mt) {
if (!$mt) {
continue;
}
if (!is_array($mt)) {
$mt = array($mt);
}
ksort($mt);
$j = json_encode($mt);
$infoForTotals[$j] = array($n, $mt);
}
foreach ($infoForTotals as $info) {
$frows = Streams_MessageTotal::select()->where(array(
'publisherId' => $publisherId,
'streamName' => $info[0],
'messageType' => $info[1]
))->fetchDbRows();
$trows = array_merge($trows, $frows);
}
foreach ($streams as $s) {
if (!$s->testReadLevel('messages')) {
return;
}
$messageTotals = array();
foreach ($trows as $row) {
if ($row->streamName === $s->name) {
$messageTotals[$row->messageType] = $row->messageCount;
}
}
$s->set('messageTotals', $messageTotals);
}
return $streams;
}
/**
* Get info about relations TO streams
* @method relatedToTotals
* @static
* @param {string} $publisherId Stream publisher id
* @param {string|array} $name Stream name or array of names
* @param {array} $options Can be:
* array('withRelatedToTotals' => array('streamName' => true)) for all rows
* array('withRelatedToTotals' => array('streamName' => array('relationType', ...))) for particular rows
* @return {array} Returns array('relationType_1' => array('fromStreamType' => relationCount), 'relationType_2' => ...)
*/
private static function relatedToTotals($publisherId, $name, $options, $streams)
{
$options = Q::ifset($options, 'withRelatedToTotals', null);
if (empty($options) || !is_array($options)) {
return $streams;
}
$infoForTotals = array();
if ($options === true){
$trows = Streams_RelatedToTotal::select()->where(array(
'toPublisherId' => $publisherId,
'toStreamName' => $name
))->fetchDbRows();
} elseif (isset($options['*'])) {
$trows = Streams_RelatedToTotal::select()->where(array(
'toPublisherId' => $publisherId,
'toStreamName' => $name,
'relationType' => $options['*']
))->fetchDbRows();
unset($options['*']);
} else {
$trows = array();
}
foreach ($options as $n => $mt) {
if (!$mt) {
continue;
}
if (!is_array($mt)) {
$mt = array($mt);
}
ksort($mt);
$j = json_encode($mt);
$infoForTotals[$j] = array($n, $mt);
}
foreach ($infoForTotals as $info) {
if ($info[1] === array(true)) { // all relations
$frows = Streams_RelatedToTotal::select()->where(array(
'toPublisherId' => $publisherId,
'toStreamName' => $info[0]
))->fetchDbRows();
} else { // particular relations
$frows = Streams_RelatedToTotal::select()->where(array(
'toPublisherId' => $publisherId,
'toStreamName' => $info[0],
'relationType' => $info[1]
))->fetchDbRows();
}
$trows = array_merge($trows, $frows);
}
foreach ($streams as $s) {
if (!$s->testReadLevel('relations')) {
return;
}
$relatedToTotals = array();
foreach ($trows as $row) {
if ($row->toStreamName !== $s->name) {
continue;
}
$relatedToTotals[$row->relationType][$row->fromStreamType] = $row->relationCount;
}
$s->set('relatedToTotals', $relatedToTotals);
}
return $streams;
}
/**
* Get info about relations FROM streams
* @method relatedFromTotals
* @static
* @param {string} $publisherId Stream publisher id
* @param {array} $options Can be:
* array('withRelatedFromTotals' => array('streamName' => true)) for all rows
* array('withRelatedFromTotals' => array('streamName' => array('relationType', ...))) for particular rows
* @return {array} Returns array('relationType_1' => array('fromStreamType' => relationCount), 'relationType_2' => ...)
*/
private static function relatedFromTotals($publisherId, $name, $options, $streams)
{
$options = Q::ifset($options, 'withRelatedFromTotals', null);
if (empty($options) || !is_array($options)) {
return $streams;
}
$infoForTotals = array();
if ($options === true){
$trows = Streams_RelatedFromTotal::select()->where(array(
'fromPublisherId' => $publisherId,
'fromStreamName' => $name
))->fetchDbRows();
} elseif (isset($options['*'])) {
$trows = Streams_RelatedFromTotal::select()->where(array(
'fromPublisherId' => $publisherId,
'fromStreamName' => $name,
'relationType' => $options['*']
))->fetchDbRows();
unset($options['*']);
} else {
$trows = array();
}
foreach ($options as $n => $mt) {
if (!$mt) {
continue;
}
if (!is_array($mt)) {
$mt = array($mt);
}
ksort($mt);
$j = json_encode($mt);
$infoForTotals[$j] = array($n, $mt);
}
foreach ($infoForTotals as $info) {
if ($info[1] === array(true)) { // all relations
$frows = Streams_RelatedFromTotal::select()->where(array(
'fromPublisherId' => $publisherId,
'fromStreamName' => $info[0]
))->fetchDbRows();
} else { // particular relations
$frows = Streams_RelatedFromTotal::select()->where(array(
'fromPublisherId' => $publisherId,
'fromStreamName' => $info[0],
'relationType' => $info[1]
))->fetchDbRows();
}
$trows = array_merge($trows, $frows);
}
foreach ($streams as $s) {
if (!$s->testReadLevel('relations')) {
return;
}
$relatedFromTotals = array();
foreach ($trows as $row) {
if ($row->fromStreamName !== $s->name) {
continue;
}
$relatedFromTotals[$row->relationType][$row->toStreamType] = $row->relationCount;
}
$s->set('relatedFromTotals', $relatedFromTotals);
}
return $streams;
}
/**
* @property $fetch
* @static
* @type array
* @protected
*/
protected static $fetch = array();
/**
* @property $cache
* @static
* @type array
*/
static $cache = array();
/**
* @property $followedInvite
* @type Streams_Invite
*/
static $followedInvite = null;
/**
* @property $requestedPublisherId_override
* @static
* @type string
*/
static $requestedPublisherId_override = null;
/**
* @property $requestedName_override
* @static
* @type string
*/
static $requestedName_override = null;
static $beingSaved = null;
static $beingSavedQuery = null;
/**
* You can set this to true to prevent caching for a while,
* e.g. during installer scripts, but make sure to set it back to false when done.
* @property $dontCache
* @static
* @type string
*/
static $dontCache = false;
};