Show:

File: platform/plugins/Streams/classes/Streams.js

"use strict";
/*jshint node:true */
/**
 * Streams model
 * @module Streams
 * @main Streams
 */
var Q = require('Q');
var fs = require('fs');

/**
 * Static methods for the Streams model
 * @class Streams
 * @extends Base.Streams
 * @static
 */
function Streams() { }
module.exports = Streams;

var Base_Streams = require('Base/Streams');
Q.mixin(Streams, Base_Streams);


/*
 * This is where you would place all the static methods for the models,
 * the ones that don't strongly pertain to a particular row or table.
 * Just assign them as methods of the Streams object.
 
 * * * */

if (!Q.plugins.Users) {
	throw new Q.Exception("Streams: Users plugin is required");
}

var util = require('util');
var Db = Q.require('Db');
var Users = Q.plugins.Users;
var socket = null;

Q.makeEventEmitter(Streams);

/**
 * Read levels
 * @property READ_LEVEL
 * @type Object
 */
/**
 * Can't see the stream
 * @property READ_LEVEL.none
 * @type integer
 * @default 0
 * @final
 */
/**
 * Can see icon and title
 * @property READ_LEVEL.see
 * @type integer
 * @default 10
 * @final
 */
/**
 * Can see the stream's content
 * @property READ_LEVEL.content
 * @type integer
 * @default 20
 * @final
 */
/**
 * Can see relations to other streams
 * @property READ_LEVEL.relations
 * @type integer
 * @default 25
 * @final
 */
/**
 * Can see participants in the stream
 * @property READ_LEVEL.participants
 * @type integer
 * @default 30
 * @final
 */
/**
 * Can play stream in a player
 * @property READ_LEVEL.messages
 * @type integer
 * @default 40
 * @final
 */
/**
 * Max read level
 * @property READ_LEVEL.max
 * @type integer
 * @default 40
 * @final
 */
Streams.READ_LEVEL = {
	'none':			0,		// can't see the stream
	'see':			10,		// can see icon and title
	'content':		20,		// can preview stream and its content
	'relations':	25,		// can see relations to other streams
	'participants':	30,		// can see participants in the stream
	'messages':		40,		// can play stream in a player
	'max':      	40  	// max read level
};
/**
 * Write levels
 * @property WRITE_LEVEL
 * @type Object
 */
/**
 * Cannot affect stream or participants list
 * @property WRITE_LEVEL.none
 * @type integer
 * @default 0
 * @final
 */
/**
 * Can become a participant, chat, and leave
 * @property WRITE_LEVEL.join
 * @type integer
 * @default 10
 * @final
 */
/**
 * Can vote for a relation message posted to the stream.
 * @property WRITE_LEVEL.vote
 * @type integer
 * @default 13
 * @final
 */
/**
 * Can post messages, but manager must approve
 * @property WRITE_LEVEL.postPending
 * @type integer
 * @default 15
 * @final
 */
/**
 * Can post messages which appear immediately
 * @property WRITE_LEVEL.messages
 * @type integer
 * @default 20
 * @final
 */
/**
 * Can post messages relating other streams to this one
 * @property WRITE_LEVEL.relate
 * @type integer
 * @default 23
 * @final
 */
/**
 * Can update properties of relations directly
 * @property WRITE_LEVEL.relations
 * @type integer
 * @default 25
 * @final
 */
/**
 * Can post messages requesting edits of stream
 * @property WRITE_LEVEL.suggest
 * @type integer
 * @default 28
 * @final
 */
/**
 * Can post messages to edit stream content immediately
 * @property WRITE_LEVEL.edit
 * @type integer
 * @default 30
 * @final
 */
/**
 * Can post a message requesting to close the stream
 * @property WRITE_LEVEL.closePending
 * @type integer
 * @default 35
 * @final
 */
/**
 * Don't delete, just prevent any new changes to stream
 * however, joining and leaving is still ok
 * @property WRITE_LEVEL.close
 * @type integer
 * @default 40
 * @final
 */
/**
 * Max write level
 * @property WRITE_LEVEL.max
 * @type integer
 * @default 40
 * @final
 */
Streams.WRITE_LEVEL = {
	'none':			0,		// cannot affect stream or participants list
	'join':			10,		// can become a participant, chat, and leave
	'vote':         13,		// can vote for a relation message posted to the stream
	'postPending':	18,		// can post messages which require manager's approval
	'post':			20,		// can post messages which take effect immediately
	'relate':       23,		// can relate other streams to this one
	'relations':    25,		// can update properties of relations directly
	'suggest':      28,		// can suggest edits of stream
	'edit':			30,		// can edit stream content immediately
	'closePending':	35,		// can post a message requesting to close the stream
	'close':		40,		// don't delete, just prevent any new changes to stream
							// however, joining and leaving is still ok
	'max':      	40 		// max write level
};
/**
 * Admin levels
 * @property ADMIN_LEVEL
 * @type Object
 */
/**
 * Cannot do anything related to admin / users
 * @property ADMIN_LEVEL.none
 * @type integer
 * @default 0
 * @final
 */
/**
 * Can prove things about the stream's content or participants
 * @property ADMIN_LEVEL.tell
 * @type integer
 * @default 10
 * @final
 */
/**
 * Able to create invitations for others, granting access
 * and permissions up to what they themselves have
 * @property ADMIN_LEVEL.invite
 * @type integer
 * @default 20
 * @final
 */
/**
 * Can approve posts, and give people any adminLevel < 'manage'
 * @property ADMIN_LEVEL.manage
 * @type integer
 * @default 30
 * @final
 */
/**
 * Can give people any adminLevel <= 'own'
 * @property ADMIN_LEVEL.own
 * @type integer
 * @default 40
 * @final
 */
/**
 * Max admin level
 * @property ADMIN_LEVEL.max
 * @type integer
 * @default 40
 * @final
 */
Streams.ADMIN_LEVEL = {
	'none':	 		0,		// cannot do anything related to admin / users
	'tell':	 		10,		// can prove things about the stream's content or participants
	'invite':		20,		// able to create invitations for others, granting access
	'manage':		30,		// can approve posts and give people any adminLevel < 30
	'own':			40,		// can give people any adminLevel <= 40
	'max':      	40  	// max admin level
};
/**
 * Access sources
 * @property ACCESS_SOURCES
 * @type object
 */
/**
 * Public access
 * @config ACCESS_SOURCES['public']
 * @type integer
 * @default 0
 * @final
 */
/**
 * From contact
 * @config ACCESS_SOURCES['contact']
 * @type integer
 * @default 1
 * @final
 */
/**
 * Direct access
 * @config ACCESS_SOURCES['direct']
 * @type integer
 * @default 2
 * @final
 */
/**
 * Inherited public access
 * @config ACCESS_SOURCES['inherited_public']
 * @type integer
 * @default 3
 * @final
 */
/**
 * Inherited from contact
 * @config ACCESS_SOURCES['inherited_contact']
 * @type integer
 * @default 4
 * @final
 */
/**
 * Inherited direct access
 * @config ACCESS_SOURCES['inherited_direct']
 * @type integer
 * @default 5
 * @final
 */
Streams.ACCESS_SOURCES = {
	'public':				0,
	'contact':				1,
	'direct':				2,
	'inherited_public':		3,
	'inherited_contact':	4,
	'inherited_direct':		5
};

Streams.defined = {};

/**
 * Call this function to set a constructor for a stream type
 * @static
 * @method define
 * @param {String} type The type of the message, e.g. "Streams/chat/message"
 * @param {String|Function} ctor Your message's constructor, or path to a javascript file which will define it
 * @param {Object} methods An optional hash of methods. You can also override methods of the
 *  Stream object, such as "url".
 */
Streams.define = function (type, ctor, methods) {
	if (typeof type === 'object') {
		for (var t in type) {
			Streams.define(t, type[t]);
		}
		return;
	};
	type = Q.normalize(type);
	if (typeof ctor !== 'function') {
		throw new Q.Error("Streams.Stream.define requires ctor to be a function");
	}
	function CustomStreamConstructor() {
		CustomStreamConstructor.constructors.apply(this, arguments);
		ctor.apply(this, arguments);
	}
	Q.mixin(CustomStreamConstructor, Streams.Stream);
	Q.extend(CustomStreamConstructor.prototype, methods);	
	return Streams.defined[type] = CustomStreamConstructor;
};

/**
 * Start internal listener for Streams plugin. Accepts messages such as<br/>
 * "Streams/Stream/join",
 * "Streams/Stream/leave",
 * "Streams/Stream/create",
 * "Streams/Stream/remove",
 * "Streams/Message/post",
 * "Streams/Message/postMessages",
 * "Streams/Stream/invite"
 * @method listen
 * @static
 * @param {Object} options={} So far no options are implemented.
 * @return {Boolean} Whether the server has started
 */
Streams.listen = function (options) {

	// Start internal server
	var server = Q.listen();
	server.attached.express.post('/Q/node', Streams_request_handler);

	// Start external socket server
	var node = Q.Config.get(['Q', 'node']);
	if (!node) {
		return false;
	}
	var pubHost = Q.Config.get(['Streams', 'node', 'host'], Q.Config.get(['Q', 'node', 'host'], null));
	var pubPort = Q.Config.get(['Streams', 'node', 'port'], Q.Config.get(['Q', 'node', 'port'], null));

	if (pubHost === null) {
		throw new Q.Exception("Streams: Missing config field: Streams/node/host");
	}
	if (pubPort === null) {
		throw new Q.Exception("Streams: Missing config field: Streams/node/port");
	}

	// Handle messages being posted to streams
	Streams.Stream.on('post', function (stream, byUserId, msg, clientId) {
		if (!stream) {
			return console.error("Streams.Stream.on POST: invalid stream!!!");
		}

		if (_messageHandlers[msg.fields.type]) {
			_messageHandlers[msg.fields.type].call(this, msg);
		}

		Streams.Stream.emit('post/'+msg.fields.type, stream, byUserId, msg);
		stream.notifyParticipants('Streams/post', byUserId, msg);
	});

	/**
	 * @property socketServer
	 * @type {SocketNamespace}
	 * @private
	 */
	socket = Users.Socket.listen({
		host: pubHost,
		port: pubPort,
		https: Q.Config.get(['Q', 'node', 'https'], false) || {},
	});

	socket.io.of('/Users').on('connection', function(client) {
		Q.log("Socket.IO client connected " + client.id);
		if (client.alreadyListeningStreams) {
			return;
		}
		client.alreadyListeningStreams = true;
		client.on('Streams/observe',
		function (sessionId, clientId, publisherId, streamName, fn) {
			if (!_validateSessionId(sessionId, fn)) {
				return;
			}
			if (typeof publisherId !== 'string'
			|| typeof streamName !== 'string') {
				return fn && fn({
					type: 'Streams.Exception.BadArguments',
					message: 'Bad arguments'
				});
			}
			var observer = Q.getObject(
				[publisherId, streamName, client.id], Streams.observers
			);
			if (observer) {
				return fn && fn(null, true);
			}
			Streams.fetchOne('', publisherId, streamName, function (err, stream) {
				if (err || !stream) {
					return fn && fn({
						type: 'Users.Exception.NotAuthorized',
						message: 'not authorized'
					});
				}
				stream.testReadLevel('messages', function (err, allowed) {
					if (err || !allowed) {
						return fn && fn({
							type: 'Users.Exception.NotAuthorized',
							message: 'not authorized'
						});
					}
					var clients = Q.getObject(
						[publisherId, streamName], Streams.observers, null, {}
					);
					var max = Streams.Stream.getConfigField(
						stream.fields.type,
						'observersMax'
					);
					if (max && Object.keys(clients).length >= max - 1) {
						return fn && fn({
							type: 'Streams.Exception.TooManyObservers',
							message: 'too many observers already'
						});
					}
					clients[client.id] = client;
					Q.setObject(
						[client.id, publisherId, streamName], true, Streams.observing
					);
					return fn && fn(null, true);
				});
			});
		});
		client.on('Streams/neglect',
		function (sessionId, clientId, publisherId, streamName, fn) {
			if (!_validateSessionId(sessionId, fn)) {
				return;
			}
			var o = Streams.observers;
			if (!Q.getObject([publisherId, streamName, client.id], o)) {
				return fn && fn(null, false);
			}
			delete o[publisherId][streamName][client.id];
			delete Streams.observing[client.id][publisherId][streamName];
			return fn && fn(null, true);
		});
		client.on('disconnect', function () {
			var observing = Streams.observing[client.id];
			if (!observing) {
				return;
			}
			for (var publisherId in observing) {
				var p = observing[publisherId];
				for (var streamName in p) {
					delete Streams.observers[publisherId][streamName][client.id];
				}
			}
			delete Streams.observing[client.id];
		});
	});
	return true;
};

/**
 * Stores socket.io clients observing streams
 * @property clients
 * @type {Object}
 */ 
Streams.observers = {};

/**
 * Stores streams that socket.io clients are observing
 * @property clients
 * @type {Object}
 */ 
Streams.observing = {};

function _validateSessionId(sessionId, fn) {
	// Validate sessionId to make sure we generated it
	var result = Users.Session.decodeId(sessionId);
	if (result[0]) {
		return true;
	}
	fn && fn({
		type: 'Users.Exception.BadSessionId',
		message: 'bad session id'
	});
	return false;
}

function Streams_request_handler (req, res, next) {
	var parsed = req.body;
	if (!parsed || !parsed['Q/method']) {
		return next();
	}
	var participant, stream, msg, posted, streams, deviceId, title, k;
	var userIds, invitingUserId, username, appUrl, parts, rest, label, myLabel;
	var readLevel, writeLevel, adminLevel, permissions, displayName, expireTime, logKey;
	var clientId = parsed["Q.clientId"];
	var stream = parsed.stream
		&& Streams.Stream.construct(JSON.parse(parsed.stream), true);
	switch (parsed['Q/method']) {
		case 'Users/device':
			break;
		case 'Users/logout':
			var userId = parsed.userId;
			var sessionId = parsed.sessionId;
			if (userId && sessionId) {
				var clients = Users.clients[userId];
				for (var cid in clients) {
					if (clients[cid].sessionId === sessionId) {
						clients[cid].disconnect();
					}
				}
			}
			Users.pushNotifications(userId, {
				badge: 0
			});
			break;
		case 'Streams/Stream/join':
			participant = new Streams.Participant(JSON.parse(parsed.participant));
			participant.fillMagicFields();
			userId = participant.fields.userId;
			if (Q.Config.get(['Streams', 'logging'], false)) {
				Q.log('Streams.listen: Streams/Stream/join {'
					+ '"publisherId": "' + stream.fields.publisherId
					+ '", "name": "' + stream.fields.name
					+ '"}'
				);
			}
			// invalidate cache for this stream
//				Streams.getParticipants.forget(stream.fields.publisherId, stream.fields.name);
			// inform user's clients about change
			Users.Socket.emitToUser(userId, 'Streams/join', participant);
			Streams.Stream.emit('join', stream, userId, clientId);
			break;
		case 'Streams/Stream/visit':
			participant = JSON.parse(parsed.participant);
			userId = participant.userId;
			Streams.Stream.emit('visit', stream, userId, clientId);
			break;
		case 'Streams/Stream/leave':
			participant = new Streams.Participant(JSON.parse(parsed.participant));
			participant.fillMagicFields();
			userId = participant.fields.userId;
			if (Q.Config.get(['Streams', 'logging'], false)) {
				Q.log('Streams.listen: Streams/Stream/leave {'
					+ '"publisherId": "' + stream.fields.publisherId
					+ '", "name": "' + stream.fields.name
					+ '"}'
				);
			}
			// invalidate cache for this stream
//				Streams.getParticipants.forget(stream.fields.publisherId, stream.fields.name);
			// inform user's clients about change
			Users.Socket.emitToUser(userId, 'Streams/leave', participant);
			Streams.Stream.emit('leave', stream, userId, clientId);
			break;
		case 'Streams/Stream/remove':
			if (Q.Config.get(['Streams', 'logging'], false)) {
				Q.log('Streams.listen: Streams/Stream/remove {'
					+ '"publisherId": "' + stream.fields.publisherId
					+ '", "name": "' + stream.fields.name
					+ '"}'
				);
			}
			// invalidate cache
			stream.notifyParticipants('Streams/remove', null, {
				publisherId: stream.fields.publisherId, 
				name: stream.fields.name
			});
			Streams.Stream.emit('remove', stream, clientId);
			break;
		case 'Streams/Stream/create':
			if (Q.Config.get(['Streams', 'logging'], false)) {
				Q.log('Streams.listen: Streams/Stream/create {'
					+ '"publisherId": "' + stream.fields.publisherId
					+ '", "name": "' + stream.fields.name
					+ '"}'
				);
			}
			Streams.Stream.emit('create', stream, clientId);
			// no need to notify anyone
			break;
		case 'Streams/Message/post':
			msg = Streams.Message.construct(JSON.parse(parsed.message), true);
			msg.fillMagicFields();
			if (Q.Config.get(['Streams', 'logging'], false)) {
				Q.log('Streams.listen: Streams/Message/post {'
					+ '"publisherId": "' + stream.fields.publisherId
					+ '", "name": "' + stream.fields.name
					+ '", "msg.type": "' + msg.fields.type
					+ '"}'
				);
			}
			Streams.Stream.emit('post', stream, msg.fields.byUserId, msg, clientId);
			break;
		case 'Streams/Message/postMessages':
			posted = JSON.parse(parsed.posted);
			streams = parsed.streams && JSON.parse(parsed.streams);
			if (!streams) break;
			for (k in posted) {
				msg = Streams.Message.construct(posted[k], true);
				msg.fillMagicFields();
				stream = Streams.Stream.construct(
					streams[msg.fields.publisherId][msg.fields.streamName], true
				);
				if (Q.Config.get(['Streams', 'logging'], false)) {
					Q.log('Streams.listen: Streams/Message/post {'
						+ '"publisherId": "' + stream.fields.publisherId
						+ '", "name": "' + stream.fields.name
						+ '", "msg.type": "' + msg.fields.type
						+ '"}'
					);
				}
				Streams.Stream.emit('post', stream, msg.fields.byUserId, msg, clientId);
			}
			break;
		case 'Streams/Stream/invite':
			try {
				userIds = JSON.parse(parsed.userIds);
				invitingUserId = parsed.invitingUserId;
				username = parsed.username;
				appUrl = parsed.appUrl;
				readLevel = parsed.readLevel || null;
				writeLevel = parsed.writeLevel || null;
				adminLevel = parsed.adminLevel || null;
				permissions = parsed.permissions || null;
				displayName = parsed.displayName || '';
				expireTime = parsed.expireTime ? new Date(parsed.expireTime*1000) : null;
			} catch (e) {
				return res.send({data: false});
			}
			res.send({data: true});
			if (logKey = Q.Config.get(['Streams', 'logging'], false)) {
				Q.log(
				    'Streams.listen: Streams/Stream/invite {'
					+ '"publisherId": "' + stream.fields.publisherId
					+ '", "name": "' + stream.fields.name
					+ '", "userIds": ' + parsed.userIds
					+ '}',
					logKey
				);
			}

			if (expireTime && expireTime <= new Date()) {
			    break;
			}
			
			persist();
			
			return;
		default:
			break;
	}
	return next();
	
	function persist () {
	
		Q.each(userIds, function (i, userId) {
			var token = null;
							
		    // TODO: Change this to a getter, so that we can do throttling in case there are too many userIds
			(new Streams.Participant({
				"publisherId": stream.fields.publisherId,
				"streamName": stream.fields.name,
				"userId": userId,
				"state": "participating"
			})).retrieve(_participant);
			
			function _participant(err, rows) {
				if (rows && rows.length) {
					// User is already a participant in the stream.
					return;
				}
				(new Streams.Invite({
					"userId": userId,
					"state": "pending",
					"publisherId": stream.fields.publisherId,
					"streamName": stream.fields.name,
					"invitingUserId": invitingUserId,
					"displayName": displayName,
					"appUrl": appUrl,
					"readLevel": readLevel,
					"writeLevel": writeLevel,
					"adminLevel": adminLevel,
					"permissions": permissions,
					"expireTime": expireTime
				})).save(_inviteSaved);
			}
			
			var invite;

			function _inviteSaved(err) {
				if (err) {
					Q.log("ERROR: Failed to save Streams.Invite for user '"+userId+"' during invite");
					Q.log(err);
					return;
				}
				token = this.fields.token;
				invite = this;
				// now ready to save Streams.Invited row
				(new Streams.Invited({
					"token": token,
					"userId": userId,
					"state": "pending",
					"expireTime": expireTime
				})).save(_invitedSaved);
			}

			function _invitedSaved(err) {
				if (err) {
					Q.log("ERROR: Failed to save Streams.Invited for user '"+userId+"' during invite");
					Q.log(err);
					return;
				}
				(new Streams.Participant({
					"publisherId": stream.fields.publisherId,
					"streamName": stream.fields.name,
					"streamType": stream.fields.type,
					"userId": userId,
					"state": "invited",
					"reason": ""
				})).save(true, _participantSaved);
				
				// Write some files, if requested
				// SECURITY: Here we trust the input, which should only be sent internally
				if (parsed.template) {
					new Users.User({id: userId})
					.retrieve(function () {
						var fields = Q.extend({}, parsed, {
							stream: stream,
							user: this,
							invite: invite,
							link: invite.url(),
							app: Q.app,
							communityId: Users.communityId(),
							communityName: Users.communityName(),
							appRootUrl: Q.Config.expect(['Q', 'web', 'appRootUrl'])
						});
						var html = Q.Handlebars.render(parsed.template, fields);
						var path = Streams.invitationsPath(invitingUserId)
							+'/'+parsed.batchName;
						var filename = path + '/'
							+ Q.normalize(stream.fields.publisherId) + '-'
							+ Q.normalize(stream.fields.name) + '-'
							+ this.fields.id + '.html';
						fs.writeFile(filename, html, function (err) {
							if (err) {
								Q.log(err);
							}
						});
					});
				}
			}

			function _participantSaved(err) {
				if (err) {
					Q.log("ERROR: Failed to save Streams.Participant for user '"+userId+"' during invite");
					Q.log(err);
					return;
				}

				// Now post a message to Streams/invited stream
				Streams.fetchOne(invitingUserId, userId, 'Streams/invited', _stream);
			}

			function _stream(err, invited) {
				if (err) {
					Q.log("ERROR: Failed to get invited stream for user '"+userId+"' during invite");
					Q.log(err);
					return;
				}
				Streams.Stream.emit('invite', invited.getFields(), userId, stream);
				if (!invited.testWriteLevel('post')) {
					Q.log("ERROR: Not authorized to post to invited stream for user '"+userId+"' during invite");
					return;
				}
				var invitedUrl = Streams.invitedUrl(token);
				displayName = displayName || "Someone";
				var msg = {
					publisherId: invited.fields.publisherId,
					streamName: invited.fields.name,
					byUserId: invitingUserId,
					type: 'Streams/invite',
					sentTime: new Db.Expression("CURRENT_TIMESTAMP"),
					state: 'posted',
					content: displayName + " invited you to " + invitedUrl,
					instructions: JSON.stringify({
						token: token,
						displayName: displayName,
						appUrl: appUrl,
						invitedUrl: invitedUrl,
						type: stream.fields.type,
						title: stream.fields.title,
						content: stream.fields.content
					})
				};
				invited.post(msg, function (err) {
					if (err) {
						Q.log("ERROR: Failed to save message for user '"+userId+"' during invite");
						Q.log(err);
					}
				});
			}
		});

    }
}

// Connection from socket.io
Users.on('connected', function(client, wasOnline) {
	if (!wasOnline) {
		// post "connected" message to Streams/participating stream
		new Streams.Stream({
			publisherId: client.userId,
			name: 'Streams/participating'
		}).post(client.userId, {
			type: 'Streams/connected'
		}, function(err) {
			if (err) console.error(err);
		});
	}
});

Users.on('disconnected', function (userId) {
	// post "disconnected" message to Streams/participating stream
	new Streams.Stream({
		publisherId: userId,
		name: 'Streams/participating'
	}).post({
		byUserId: userId,
		type: 'Streams/disconnected'
	}, function(err) {
		if (err) console.error(err);
		Q.log('User disconnected: ' + userId);
	});	
});

/**
 * Retrieve stream participants
 * @method getParticipants
 * @static
 * @param {String} publisherId The publisher Id
 * @param {String} streamName The name of the stream
 * @param {Function} [callback=null] Callback receives a map of {userId: participant} pairs
 */
Streams.getParticipants = function(publisherId, streamName, callback) {
	var args = arguments;
	if (!callback) return;
	Streams.Participant.SELECT('*').where({
		publisherId: publisherId,
		streamName: streamName
	}).execute(function (err, rows) {
		if (err) {
			Q.log(err);
//			Streams.getParticipants.forget(publisherId, streamName);
			callback({});
		} else {
			var result = {};
			for (var i=0; i<rows.length; ++i) {
				result [ rows[i].fields.userId ] = rows[i];
			}
			callback(result);
		}
	});
};

/**
 * Retrieve socket.io clients registered to observe the stream
 * by sending "Streams/join" events through the socket.
 * @method getObservers
 * @static
 * @param {String} publisherId The publisher Id
 * @param {String} streamName The name of the stream
 * @param {Function} [callback=null] Callback receives a map of {clientId: socketClient} pairs
 */
Streams.getObservers = function(publisherId, streamName, callback) {
	var observers = Q.getObject([publisherId, streamName], Streams.observers);
	callback && callback(observers || {});
};

/**
 * Retrieve stream with calculated access rights
 * @method fetch
 * @static
 * @param {String}  asUserId
 *	The user id to calculate access rights
 * @param {String} publisherId
 *	The publisher Id
 * @param {String|Array|Db.Range} streamName
 *	The name of the stream, or an array of names, or a Db.Range
 * @param callback=null {function}
 *	Callback receives (err, streams) as parameters
 * @param {String} [fields='*']
 *  Comma delimited list of fields to retrieve in the stream.
 *  Must include at least "publisherId" and "name".
 *  since make up the primary key of the stream table.
 *  You can skip this argument if you want.
 * @param {Object} [options={}]
 *  Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
 *  @see Db_Query_Mysql::options().
 */
Streams.fetch = function (asUserId, publisherId, streamName, callback, fields, options) {
	if (!callback) return;
	if (!publisherId || !streamName) {
		return callback(new Error("Wrong arguments"));
	}
	if (typeof streamName.charAt === 'function'
	&& streamName.charAt(streamName.length-1) === '/') {
		streamName = new Db.Range(streamName, true, false, streamName.slice(0, -1)+'0');
	}
	if (Q.isPlainObject(fields)) {
		options = fields;
		fields = '*';
	}
	fields = fields || '*';
	Streams.Stream.SELECT(fields)
	.where({publisherId: publisherId, name: streamName})
	.options(options)
	.execute(function(err, res) {
		if (err) {
		    return callback(err);
		}
		if (!res.length) {
		    return callback(null, []);
		}
		var p = new Q.Pipe(res.map(function(a) { return a.fields.name; }),
		function(params, subjects) {
			for (var name in params) {
				if (params[name][0]) {
					callback(params[name][0]); // there was an error
					return;
				}
			}
			callback(null, subjects); // success
		});
		for (var i=0; i<res.length; i++) {
			res[i].calculateAccess(asUserId, p.fill(res[i].fields.name));
		}
	});
};

/**
 * Retrieve stream with calculated access rights
 * @method fetchOne
 * @static
 * @param {String} asUserId
 *	The user id to calculate access rights
 * @param {String} publisherId
 *	The publisher Id
 * @param {String} streamName
 *	The name of the stream
 * @param {Function} [callback=null]
 *	Callback receives the (err, stream) as parameters
 * @param {String} [fields='*']
 *  Comma delimited list of fields to retrieve in the stream.
 *  Must include at least "publisherId" and "name".
 *  since make up the primary key of the stream table.
 *  You can skip this argument if you want.
 * @param {Object} [options={}]
 *  Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
 *  @see Db_Query_Mysql::options().
 */
Streams.fetchOne = function (asUserId, publisherId, streamName, callback, fields, options) {
	if (!callback) return;
	if (!publisherId || !streamName
	|| typeof publisherId !== 'string'
	|| typeof streamName !== 'string') {
		return callback(new Error("Wrong arguments"));
	}
	if (Q.isPlainObject(fields)) {
		options = fields;
		fields = '*';
	}
	Streams.Stream.SELECT('*')
	.where({publisherId: publisherId, name: streamName})
	.options(options)
	.limit(1).execute(function(err, res) {
		if (err) {
		    return callback(err);
		}
		if (!res.length) {
		    callback(null, null);
		}
		res[0].calculateAccess(asUserId, function () {
		    callback.call(res[0], null, res[0]);
		});
	});
};

/**
 * Register a message handler
 * @method messageHandler
 * @static
 * @param {String} msgType
 *	Type of stream
 * @param {Function} callback
 *	The handler for stream messages
 */
Streams.messageHandler = function(msgType, callback) {
	if (callback === undefined) {
		return _messageHandlers[msgType];
	}
	if (typeof callback !== 'function') {
		throw new Q.Exception("Streams: callback passed to messageHandler is not a function");
	}
	_messageHandlers[msgType] = callback;
};

/**
 * Calculate the url of a stream's icon
 * @static
 * @method iconUrl
 * @param {String} icon the value of the stream's "icon" field
 * @param {Number} [size=40] the size of the icon to render. Defaults to 40.
 * @return {String} the url
 */
Streams.iconUrl = function(icon, size) {
	if (!icon) {
		console.warn("Streams.iconUrl: icon is empty");
		return '';
	}
	if (!size || size === true) {
		size = '40';
	}
	size = (String(size).indexOf('.') >= 0) ? size : size+'.png';
	var src = Q.interpolateUrl(icon + '/' + size);
	return src.isUrl() || icon.substr(0, 2) == '{{'
		? src
		: Q.url('{{Streams}}/img/icons/'+src);
};

Streams.invitedUrl = function _Streams_invitedUrl(token) {
	return Q.url(Q.Config.get(['Streams', 'invites', 'baseUrl'], "i"))
		+ "/" + token;
};

Streams.invitationsPath = function _Streams_invitationsPath(userId) {
	var subpath = Q.Config.get(
		'Streams', 'invites', 'subpath',
		'{{app}}/uploads/Streams/invitations'
	);
	return Q.app.FILES_DIR + '/' + subpath.interpolate({
		app: Q.Config.expect(['Q', 'app'])
	}) + '/' + Q.Utils.splitId(userId);
};
/**
 * Use this to check whether variable is a Q.Streams.Stream object
 * @static
 * @method isStream
 * @param {mixed} testing
 * @return {boolean}
 */
Streams.isStream = function (testing) {
	return Q.typeOf(testing) === "Q.Streams.Stream";
};

/**
 * @property _messageHandlers
 * @type object
 * @private
 */
var _messageHandlers = {};
/**
 * @property _streams
 * @type object
 * @private
 */
var _streams = {};

/* * * */