- "use strict";
- /*jshint node:true */
- /**
- * Streams model
- * @module Streams
- * @main Streams
- */
- var Q = require('Q');
- var fs = require('fs');
-
- /**
- * Static methods for the Streams model
- * @class Streams
- * @extends Base.Streams
- * @static
- */
- function Streams() { }
- module.exports = Streams;
-
- var Base_Streams = require('Base/Streams');
- Q.mixin(Streams, Base_Streams);
-
-
- /*
- * This is where you would place all the static methods for the models,
- * the ones that don't strongly pertain to a particular row or table.
- * Just assign them as methods of the Streams object.
-
- * * * */
-
- if (!Q.plugins.Users) {
- throw new Q.Exception("Streams: Users plugin is required");
- }
-
- var util = require('util');
- var Db = Q.require('Db');
- var Users = Q.plugins.Users;
- var socket = null;
-
- Q.makeEventEmitter(Streams);
-
- /**
- * Read levels
- * @property READ_LEVEL
- * @type Object
- */
- /**
- * Can't see the stream
- * @property READ_LEVEL.none
- * @type integer
- * @default 0
- * @final
- */
- /**
- * Can see icon and title
- * @property READ_LEVEL.see
- * @type integer
- * @default 10
- * @final
- */
- /**
- * Can see the stream's content
- * @property READ_LEVEL.content
- * @type integer
- * @default 20
- * @final
- */
- /**
- * Can see relations to other streams
- * @property READ_LEVEL.relations
- * @type integer
- * @default 25
- * @final
- */
- /**
- * Can see participants in the stream
- * @property READ_LEVEL.participants
- * @type integer
- * @default 30
- * @final
- */
- /**
- * Can play stream in a player
- * @property READ_LEVEL.messages
- * @type integer
- * @default 40
- * @final
- */
- /**
- * Max read level
- * @property READ_LEVEL.max
- * @type integer
- * @default 40
- * @final
- */
- Streams.READ_LEVEL = {
- 'none': 0, // can't see the stream
- 'see': 10, // can see icon and title
- 'content': 20, // can preview stream and its content
- 'relations': 25, // can see relations to other streams
- 'participants': 30, // can see participants in the stream
- 'messages': 40, // can play stream in a player
- 'max': 40 // max read level
- };
- /**
- * Write levels
- * @property WRITE_LEVEL
- * @type Object
- */
- /**
- * Cannot affect stream or participants list
- * @property WRITE_LEVEL.none
- * @type integer
- * @default 0
- * @final
- */
- /**
- * Can become a participant, chat, and leave
- * @property WRITE_LEVEL.join
- * @type integer
- * @default 10
- * @final
- */
- /**
- * Can vote for a relation message posted to the stream.
- * @property WRITE_LEVEL.vote
- * @type integer
- * @default 13
- * @final
- */
- /**
- * Can post messages, but manager must approve
- * @property WRITE_LEVEL.postPending
- * @type integer
- * @default 15
- * @final
- */
- /**
- * Can post messages which appear immediately
- * @property WRITE_LEVEL.messages
- * @type integer
- * @default 20
- * @final
- */
- /**
- * Can post messages relating other streams to this one
- * @property WRITE_LEVEL.relate
- * @type integer
- * @default 23
- * @final
- */
- /**
- * Can update properties of relations directly
- * @property WRITE_LEVEL.relations
- * @type integer
- * @default 25
- * @final
- */
- /**
- * Can post messages requesting edits of stream
- * @property WRITE_LEVEL.suggest
- * @type integer
- * @default 28
- * @final
- */
- /**
- * Can post messages to edit stream content immediately
- * @property WRITE_LEVEL.edit
- * @type integer
- * @default 30
- * @final
- */
- /**
- * Can post a message requesting to close the stream
- * @property WRITE_LEVEL.closePending
- * @type integer
- * @default 35
- * @final
- */
- /**
- * Don't delete, just prevent any new changes to stream
- * however, joining and leaving is still ok
- * @property WRITE_LEVEL.close
- * @type integer
- * @default 40
- * @final
- */
- /**
- * Max write level
- * @property WRITE_LEVEL.max
- * @type integer
- * @default 40
- * @final
- */
- Streams.WRITE_LEVEL = {
- 'none': 0, // cannot affect stream or participants list
- 'join': 10, // can become a participant, chat, and leave
- 'vote': 13, // can vote for a relation message posted to the stream
- 'postPending': 18, // can post messages which require manager's approval
- 'post': 20, // can post messages which take effect immediately
- 'relate': 23, // can relate other streams to this one
- 'relations': 25, // can update properties of relations directly
- 'suggest': 28, // can suggest edits of stream
- 'edit': 30, // can edit stream content immediately
- 'closePending': 35, // can post a message requesting to close the stream
- 'close': 40, // don't delete, just prevent any new changes to stream
- // however, joining and leaving is still ok
- 'max': 40 // max write level
- };
- /**
- * Admin levels
- * @property ADMIN_LEVEL
- * @type Object
- */
- /**
- * Cannot do anything related to admin / users
- * @property ADMIN_LEVEL.none
- * @type integer
- * @default 0
- * @final
- */
- /**
- * Can prove things about the stream's content or participants
- * @property ADMIN_LEVEL.tell
- * @type integer
- * @default 10
- * @final
- */
- /**
- * Able to create invitations for others, granting access
- * and permissions up to what they themselves have
- * @property ADMIN_LEVEL.invite
- * @type integer
- * @default 20
- * @final
- */
- /**
- * Can approve posts, and give people any adminLevel < 'manage'
- * @property ADMIN_LEVEL.manage
- * @type integer
- * @default 30
- * @final
- */
- /**
- * Can give people any adminLevel <= 'own'
- * @property ADMIN_LEVEL.own
- * @type integer
- * @default 40
- * @final
- */
- /**
- * Max admin level
- * @property ADMIN_LEVEL.max
- * @type integer
- * @default 40
- * @final
- */
- Streams.ADMIN_LEVEL = {
- 'none': 0, // cannot do anything related to admin / users
- 'tell': 10, // can prove things about the stream's content or participants
- 'invite': 20, // able to create invitations for others, granting access
- 'manage': 30, // can approve posts and give people any adminLevel < 30
- 'own': 40, // can give people any adminLevel <= 40
- 'max': 40 // max admin level
- };
- /**
- * Access sources
- * @property ACCESS_SOURCES
- * @type object
- */
- /**
- * Public access
- * @config ACCESS_SOURCES['public']
- * @type integer
- * @default 0
- * @final
- */
- /**
- * From contact
- * @config ACCESS_SOURCES['contact']
- * @type integer
- * @default 1
- * @final
- */
- /**
- * Direct access
- * @config ACCESS_SOURCES['direct']
- * @type integer
- * @default 2
- * @final
- */
- /**
- * Inherited public access
- * @config ACCESS_SOURCES['inherited_public']
- * @type integer
- * @default 3
- * @final
- */
- /**
- * Inherited from contact
- * @config ACCESS_SOURCES['inherited_contact']
- * @type integer
- * @default 4
- * @final
- */
- /**
- * Inherited direct access
- * @config ACCESS_SOURCES['inherited_direct']
- * @type integer
- * @default 5
- * @final
- */
- Streams.ACCESS_SOURCES = {
- 'public': 0,
- 'contact': 1,
- 'direct': 2,
- 'inherited_public': 3,
- 'inherited_contact': 4,
- 'inherited_direct': 5
- };
-
- Streams.defined = {};
-
- /**
- * Call this function to set a constructor for a stream type
- * @static
- * @method define
- * @param {String} type The type of the message, e.g. "Streams/chat/message"
- * @param {String|Function} ctor Your message's constructor, or path to a javascript file which will define it
- * @param {Object} methods An optional hash of methods. You can also override methods of the
- * Stream object, such as "url".
- */
- Streams.define = function (type, ctor, methods) {
- if (typeof type === 'object') {
- for (var t in type) {
- Streams.define(t, type[t]);
- }
- return;
- };
- type = Q.normalize(type);
- if (typeof ctor !== 'function') {
- throw new Q.Error("Streams.Stream.define requires ctor to be a function");
- }
- function CustomStreamConstructor() {
- CustomStreamConstructor.constructors.apply(this, arguments);
- ctor.apply(this, arguments);
- }
- Q.mixin(CustomStreamConstructor, Streams.Stream);
- Q.extend(CustomStreamConstructor.prototype, methods);
- return Streams.defined[type] = CustomStreamConstructor;
- };
-
- /**
- * Start internal listener for Streams plugin. Accepts messages such as<br/>
- * "Streams/Stream/join",
- * "Streams/Stream/leave",
- * "Streams/Stream/create",
- * "Streams/Stream/remove",
- * "Streams/Message/post",
- * "Streams/Message/postMessages",
- * "Streams/Stream/invite"
- * @method listen
- * @static
- * @param {Object} options={} So far no options are implemented.
- * @return {Boolean} Whether the server has started
- */
- Streams.listen = function (options) {
-
- // Start internal server
- var server = Q.listen();
- server.attached.express.post('/Q/node', Streams_request_handler);
-
- // Start external socket server
- var node = Q.Config.get(['Q', 'node']);
- if (!node) {
- return false;
- }
- var pubHost = Q.Config.get(['Streams', 'node', 'host'], Q.Config.get(['Q', 'node', 'host'], null));
- var pubPort = Q.Config.get(['Streams', 'node', 'port'], Q.Config.get(['Q', 'node', 'port'], null));
-
- if (pubHost === null) {
- throw new Q.Exception("Streams: Missing config field: Streams/node/host");
- }
- if (pubPort === null) {
- throw new Q.Exception("Streams: Missing config field: Streams/node/port");
- }
-
- // Handle messages being posted to streams
- Streams.Stream.on('post', function (stream, byUserId, msg, clientId) {
- if (!stream) {
- return console.error("Streams.Stream.on POST: invalid stream!!!");
- }
-
- if (_messageHandlers[msg.fields.type]) {
- _messageHandlers[msg.fields.type].call(this, msg);
- }
-
- Streams.Stream.emit('post/'+msg.fields.type, stream, byUserId, msg);
- stream.notifyParticipants('Streams/post', byUserId, msg);
- });
-
- /**
- * @property socketServer
- * @type {SocketNamespace}
- * @private
- */
- socket = Users.Socket.listen({
- host: pubHost,
- port: pubPort,
- https: Q.Config.get(['Q', 'node', 'https'], false) || {},
- });
-
- socket.io.of('/Users').on('connection', function(client) {
- Q.log("Socket.IO client connected " + client.id);
- if (client.alreadyListeningStreams) {
- return;
- }
- client.alreadyListeningStreams = true;
- client.on('Streams/observe',
- function (sessionId, clientId, publisherId, streamName, fn) {
- if (!_validateSessionId(sessionId, fn)) {
- return;
- }
- if (typeof publisherId !== 'string'
- || typeof streamName !== 'string') {
- return fn && fn({
- type: 'Streams.Exception.BadArguments',
- message: 'Bad arguments'
- });
- }
- var observer = Q.getObject(
- [publisherId, streamName, client.id], Streams.observers
- );
- if (observer) {
- return fn && fn(null, true);
- }
- Streams.fetchOne('', publisherId, streamName, function (err, stream) {
- if (err || !stream) {
- return fn && fn({
- type: 'Users.Exception.NotAuthorized',
- message: 'not authorized'
- });
- }
- stream.testReadLevel('messages', function (err, allowed) {
- if (err || !allowed) {
- return fn && fn({
- type: 'Users.Exception.NotAuthorized',
- message: 'not authorized'
- });
- }
- var clients = Q.getObject(
- [publisherId, streamName], Streams.observers, null, {}
- );
- var max = Streams.Stream.getConfigField(
- stream.fields.type,
- 'observersMax'
- );
- if (max && Object.keys(clients).length >= max - 1) {
- return fn && fn({
- type: 'Streams.Exception.TooManyObservers',
- message: 'too many observers already'
- });
- }
- clients[client.id] = client;
- Q.setObject(
- [client.id, publisherId, streamName], true, Streams.observing
- );
- return fn && fn(null, true);
- });
- });
- });
- client.on('Streams/neglect',
- function (sessionId, clientId, publisherId, streamName, fn) {
- if (!_validateSessionId(sessionId, fn)) {
- return;
- }
- var o = Streams.observers;
- if (!Q.getObject([publisherId, streamName, client.id], o)) {
- return fn && fn(null, false);
- }
- delete o[publisherId][streamName][client.id];
- delete Streams.observing[client.id][publisherId][streamName];
- return fn && fn(null, true);
- });
- client.on('disconnect', function () {
- var observing = Streams.observing[client.id];
- if (!observing) {
- return;
- }
- for (var publisherId in observing) {
- var p = observing[publisherId];
- for (var streamName in p) {
- delete Streams.observers[publisherId][streamName][client.id];
- }
- }
- delete Streams.observing[client.id];
- });
- });
- return true;
- };
-
- /**
- * Stores socket.io clients observing streams
- * @property clients
- * @type {Object}
- */
- Streams.observers = {};
-
- /**
- * Stores streams that socket.io clients are observing
- * @property clients
- * @type {Object}
- */
- Streams.observing = {};
-
- function _validateSessionId(sessionId, fn) {
- // Validate sessionId to make sure we generated it
- var result = Users.Session.decodeId(sessionId);
- if (result[0]) {
- return true;
- }
- fn && fn({
- type: 'Users.Exception.BadSessionId',
- message: 'bad session id'
- });
- return false;
- }
-
- function Streams_request_handler (req, res, next) {
- var parsed = req.body;
- if (!parsed || !parsed['Q/method']) {
- return next();
- }
- var participant, stream, msg, posted, streams, deviceId, title, k;
- var userIds, invitingUserId, username, appUrl, parts, rest, label, myLabel;
- var readLevel, writeLevel, adminLevel, permissions, displayName, expireTime, logKey;
- var clientId = parsed["Q.clientId"];
- var stream = parsed.stream
- && Streams.Stream.construct(JSON.parse(parsed.stream), true);
- switch (parsed['Q/method']) {
- case 'Users/device':
- break;
- case 'Users/logout':
- var userId = parsed.userId;
- var sessionId = parsed.sessionId;
- if (userId && sessionId) {
- var clients = Users.clients[userId];
- for (var cid in clients) {
- if (clients[cid].sessionId === sessionId) {
- clients[cid].disconnect();
- }
- }
- }
- Users.pushNotifications(userId, {
- badge: 0
- });
- break;
- case 'Streams/Stream/join':
- participant = new Streams.Participant(JSON.parse(parsed.participant));
- participant.fillMagicFields();
- userId = participant.fields.userId;
- if (Q.Config.get(['Streams', 'logging'], false)) {
- Q.log('Streams.listen: Streams/Stream/join {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '"}'
- );
- }
- // invalidate cache for this stream
- // Streams.getParticipants.forget(stream.fields.publisherId, stream.fields.name);
- // inform user's clients about change
- Users.Socket.emitToUser(userId, 'Streams/join', participant);
- Streams.Stream.emit('join', stream, userId, clientId);
- break;
- case 'Streams/Stream/visit':
- participant = JSON.parse(parsed.participant);
- userId = participant.userId;
- Streams.Stream.emit('visit', stream, userId, clientId);
- break;
- case 'Streams/Stream/leave':
- participant = new Streams.Participant(JSON.parse(parsed.participant));
- participant.fillMagicFields();
- userId = participant.fields.userId;
- if (Q.Config.get(['Streams', 'logging'], false)) {
- Q.log('Streams.listen: Streams/Stream/leave {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '"}'
- );
- }
- // invalidate cache for this stream
- // Streams.getParticipants.forget(stream.fields.publisherId, stream.fields.name);
- // inform user's clients about change
- Users.Socket.emitToUser(userId, 'Streams/leave', participant);
- Streams.Stream.emit('leave', stream, userId, clientId);
- break;
- case 'Streams/Stream/remove':
- if (Q.Config.get(['Streams', 'logging'], false)) {
- Q.log('Streams.listen: Streams/Stream/remove {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '"}'
- );
- }
- // invalidate cache
- stream.notifyParticipants('Streams/remove', null, {
- publisherId: stream.fields.publisherId,
- name: stream.fields.name
- });
- Streams.Stream.emit('remove', stream, clientId);
- break;
- case 'Streams/Stream/create':
- if (Q.Config.get(['Streams', 'logging'], false)) {
- Q.log('Streams.listen: Streams/Stream/create {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '"}'
- );
- }
- Streams.Stream.emit('create', stream, clientId);
- // no need to notify anyone
- break;
- case 'Streams/Message/post':
- msg = Streams.Message.construct(JSON.parse(parsed.message), true);
- msg.fillMagicFields();
- if (Q.Config.get(['Streams', 'logging'], false)) {
- Q.log('Streams.listen: Streams/Message/post {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '", "msg.type": "' + msg.fields.type
- + '"}'
- );
- }
- Streams.Stream.emit('post', stream, msg.fields.byUserId, msg, clientId);
- break;
- case 'Streams/Message/postMessages':
- posted = JSON.parse(parsed.posted);
- streams = parsed.streams && JSON.parse(parsed.streams);
- if (!streams) break;
- for (k in posted) {
- msg = Streams.Message.construct(posted[k], true);
- msg.fillMagicFields();
- stream = Streams.Stream.construct(
- streams[msg.fields.publisherId][msg.fields.streamName], true
- );
- if (Q.Config.get(['Streams', 'logging'], false)) {
- Q.log('Streams.listen: Streams/Message/post {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '", "msg.type": "' + msg.fields.type
- + '"}'
- );
- }
- Streams.Stream.emit('post', stream, msg.fields.byUserId, msg, clientId);
- }
- break;
- case 'Streams/Stream/invite':
- try {
- userIds = JSON.parse(parsed.userIds);
- invitingUserId = parsed.invitingUserId;
- username = parsed.username;
- appUrl = parsed.appUrl;
- readLevel = parsed.readLevel || null;
- writeLevel = parsed.writeLevel || null;
- adminLevel = parsed.adminLevel || null;
- permissions = parsed.permissions || null;
- displayName = parsed.displayName || '';
- expireTime = parsed.expireTime ? new Date(parsed.expireTime*1000) : null;
- } catch (e) {
- return res.send({data: false});
- }
- res.send({data: true});
- if (logKey = Q.Config.get(['Streams', 'logging'], false)) {
- Q.log(
- 'Streams.listen: Streams/Stream/invite {'
- + '"publisherId": "' + stream.fields.publisherId
- + '", "name": "' + stream.fields.name
- + '", "userIds": ' + parsed.userIds
- + '}',
- logKey
- );
- }
-
- if (expireTime && expireTime <= new Date()) {
- break;
- }
-
- persist();
-
- return;
- default:
- break;
- }
- return next();
-
- function persist () {
-
- Q.each(userIds, function (i, userId) {
- var token = null;
-
- // TODO: Change this to a getter, so that we can do throttling in case there are too many userIds
- (new Streams.Participant({
- "publisherId": stream.fields.publisherId,
- "streamName": stream.fields.name,
- "userId": userId,
- "state": "participating"
- })).retrieve(_participant);
-
- function _participant(err, rows) {
- if (rows && rows.length) {
- // User is already a participant in the stream.
- return;
- }
- (new Streams.Invite({
- "userId": userId,
- "state": "pending",
- "publisherId": stream.fields.publisherId,
- "streamName": stream.fields.name,
- "invitingUserId": invitingUserId,
- "displayName": displayName,
- "appUrl": appUrl,
- "readLevel": readLevel,
- "writeLevel": writeLevel,
- "adminLevel": adminLevel,
- "permissions": permissions,
- "expireTime": expireTime
- })).save(_inviteSaved);
- }
-
- var invite;
-
- function _inviteSaved(err) {
- if (err) {
- Q.log("ERROR: Failed to save Streams.Invite for user '"+userId+"' during invite");
- Q.log(err);
- return;
- }
- token = this.fields.token;
- invite = this;
- // now ready to save Streams.Invited row
- (new Streams.Invited({
- "token": token,
- "userId": userId,
- "state": "pending",
- "expireTime": expireTime
- })).save(_invitedSaved);
- }
-
- function _invitedSaved(err) {
- if (err) {
- Q.log("ERROR: Failed to save Streams.Invited for user '"+userId+"' during invite");
- Q.log(err);
- return;
- }
- (new Streams.Participant({
- "publisherId": stream.fields.publisherId,
- "streamName": stream.fields.name,
- "streamType": stream.fields.type,
- "userId": userId,
- "state": "invited",
- "reason": ""
- })).save(true, _participantSaved);
-
- // Write some files, if requested
- // SECURITY: Here we trust the input, which should only be sent internally
- if (parsed.template) {
- new Users.User({id: userId})
- .retrieve(function () {
- var fields = Q.extend({}, parsed, {
- stream: stream,
- user: this,
- invite: invite,
- link: invite.url(),
- app: Q.app,
- communityId: Users.communityId(),
- communityName: Users.communityName(),
- appRootUrl: Q.Config.expect(['Q', 'web', 'appRootUrl'])
- });
- var html = Q.Handlebars.render(parsed.template, fields);
- var path = Streams.invitationsPath(invitingUserId)
- +'/'+parsed.batchName;
- var filename = path + '/'
- + Q.normalize(stream.fields.publisherId) + '-'
- + Q.normalize(stream.fields.name) + '-'
- + this.fields.id + '.html';
- fs.writeFile(filename, html, function (err) {
- if (err) {
- Q.log(err);
- }
- });
- });
- }
- }
-
- function _participantSaved(err) {
- if (err) {
- Q.log("ERROR: Failed to save Streams.Participant for user '"+userId+"' during invite");
- Q.log(err);
- return;
- }
-
- // Now post a message to Streams/invited stream
- Streams.fetchOne(invitingUserId, userId, 'Streams/invited', _stream);
- }
-
- function _stream(err, invited) {
- if (err) {
- Q.log("ERROR: Failed to get invited stream for user '"+userId+"' during invite");
- Q.log(err);
- return;
- }
- Streams.Stream.emit('invite', invited.getFields(), userId, stream);
- if (!invited.testWriteLevel('post')) {
- Q.log("ERROR: Not authorized to post to invited stream for user '"+userId+"' during invite");
- return;
- }
- var invitedUrl = Streams.invitedUrl(token);
- displayName = displayName || "Someone";
- var msg = {
- publisherId: invited.fields.publisherId,
- streamName: invited.fields.name,
- byUserId: invitingUserId,
- type: 'Streams/invite',
- sentTime: new Db.Expression("CURRENT_TIMESTAMP"),
- state: 'posted',
- content: displayName + " invited you to " + invitedUrl,
- instructions: JSON.stringify({
- token: token,
- displayName: displayName,
- appUrl: appUrl,
- invitedUrl: invitedUrl,
- type: stream.fields.type,
- title: stream.fields.title,
- content: stream.fields.content
- })
- };
- invited.post(msg, function (err) {
- if (err) {
- Q.log("ERROR: Failed to save message for user '"+userId+"' during invite");
- Q.log(err);
- }
- });
- }
- });
-
- }
- }
-
- // Connection from socket.io
- Users.on('connected', function(client, wasOnline) {
- if (!wasOnline) {
- // post "connected" message to Streams/participating stream
- new Streams.Stream({
- publisherId: client.userId,
- name: 'Streams/participating'
- }).post(client.userId, {
- type: 'Streams/connected'
- }, function(err) {
- if (err) console.error(err);
- });
- }
- });
-
- Users.on('disconnected', function (userId) {
- // post "disconnected" message to Streams/participating stream
- new Streams.Stream({
- publisherId: userId,
- name: 'Streams/participating'
- }).post({
- byUserId: userId,
- type: 'Streams/disconnected'
- }, function(err) {
- if (err) console.error(err);
- Q.log('User disconnected: ' + userId);
- });
- });
-
- /**
- * Retrieve stream participants
- * @method getParticipants
- * @static
- * @param {String} publisherId The publisher Id
- * @param {String} streamName The name of the stream
- * @param {Function} [callback=null] Callback receives a map of {userId: participant} pairs
- */
- Streams.getParticipants = function(publisherId, streamName, callback) {
- var args = arguments;
- if (!callback) return;
- Streams.Participant.SELECT('*').where({
- publisherId: publisherId,
- streamName: streamName
- }).execute(function (err, rows) {
- if (err) {
- Q.log(err);
- // Streams.getParticipants.forget(publisherId, streamName);
- callback({});
- } else {
- var result = {};
- for (var i=0; i<rows.length; ++i) {
- result [ rows[i].fields.userId ] = rows[i];
- }
- callback(result);
- }
- });
- };
-
- /**
- * Retrieve socket.io clients registered to observe the stream
- * by sending "Streams/join" events through the socket.
- * @method getObservers
- * @static
- * @param {String} publisherId The publisher Id
- * @param {String} streamName The name of the stream
- * @param {Function} [callback=null] Callback receives a map of {clientId: socketClient} pairs
- */
- Streams.getObservers = function(publisherId, streamName, callback) {
- var observers = Q.getObject([publisherId, streamName], Streams.observers);
- callback && callback(observers || {});
- };
-
- /**
- * Retrieve stream with calculated access rights
- * @method fetch
- * @static
- * @param {String} asUserId
- * The user id to calculate access rights
- * @param {String} publisherId
- * The publisher Id
- * @param {String|Array|Db.Range} streamName
- * The name of the stream, or an array of names, or a Db.Range
- * @param callback=null {function}
- * Callback receives (err, streams) as parameters
- * @param {String} [fields='*']
- * Comma delimited list of fields to retrieve in the stream.
- * Must include at least "publisherId" and "name".
- * since make up the primary key of the stream table.
- * You can skip this argument if you want.
- * @param {Object} [options={}]
- * Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
- * @see Db_Query_Mysql::options().
- */
- Streams.fetch = function (asUserId, publisherId, streamName, callback, fields, options) {
- if (!callback) return;
- if (!publisherId || !streamName) {
- return callback(new Error("Wrong arguments"));
- }
- if (typeof streamName.charAt === 'function'
- && streamName.charAt(streamName.length-1) === '/') {
- streamName = new Db.Range(streamName, true, false, streamName.slice(0, -1)+'0');
- }
- if (Q.isPlainObject(fields)) {
- options = fields;
- fields = '*';
- }
- fields = fields || '*';
- Streams.Stream.SELECT(fields)
- .where({publisherId: publisherId, name: streamName})
- .options(options)
- .execute(function(err, res) {
- if (err) {
- return callback(err);
- }
- if (!res.length) {
- return callback(null, []);
- }
- var p = new Q.Pipe(res.map(function(a) { return a.fields.name; }),
- function(params, subjects) {
- for (var name in params) {
- if (params[name][0]) {
- callback(params[name][0]); // there was an error
- return;
- }
- }
- callback(null, subjects); // success
- });
- for (var i=0; i<res.length; i++) {
- res[i].calculateAccess(asUserId, p.fill(res[i].fields.name));
- }
- });
- };
-
- /**
- * Retrieve stream with calculated access rights
- * @method fetchOne
- * @static
- * @param {String} asUserId
- * The user id to calculate access rights
- * @param {String} publisherId
- * The publisher Id
- * @param {String} streamName
- * The name of the stream
- * @param {Function} [callback=null]
- * Callback receives the (err, stream) as parameters
- * @param {String} [fields='*']
- * Comma delimited list of fields to retrieve in the stream.
- * Must include at least "publisherId" and "name".
- * since make up the primary key of the stream table.
- * You can skip this argument if you want.
- * @param {Object} [options={}]
- * Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
- * @see Db_Query_Mysql::options().
- */
- Streams.fetchOne = function (asUserId, publisherId, streamName, callback, fields, options) {
- if (!callback) return;
- if (!publisherId || !streamName
- || typeof publisherId !== 'string'
- || typeof streamName !== 'string') {
- return callback(new Error("Wrong arguments"));
- }
- if (Q.isPlainObject(fields)) {
- options = fields;
- fields = '*';
- }
- Streams.Stream.SELECT('*')
- .where({publisherId: publisherId, name: streamName})
- .options(options)
- .limit(1).execute(function(err, res) {
- if (err) {
- return callback(err);
- }
- if (!res.length) {
- callback(null, null);
- }
- res[0].calculateAccess(asUserId, function () {
- callback.call(res[0], null, res[0]);
- });
- });
- };
-
- /**
- * Register a message handler
- * @method messageHandler
- * @static
- * @param {String} msgType
- * Type of stream
- * @param {Function} callback
- * The handler for stream messages
- */
- Streams.messageHandler = function(msgType, callback) {
- if (callback === undefined) {
- return _messageHandlers[msgType];
- }
- if (typeof callback !== 'function') {
- throw new Q.Exception("Streams: callback passed to messageHandler is not a function");
- }
- _messageHandlers[msgType] = callback;
- };
-
- /**
- * Calculate the url of a stream's icon
- * @static
- * @method iconUrl
- * @param {String} icon the value of the stream's "icon" field
- * @param {Number} [size=40] the size of the icon to render. Defaults to 40.
- * @return {String} the url
- */
- Streams.iconUrl = function(icon, size) {
- if (!icon) {
- console.warn("Streams.iconUrl: icon is empty");
- return '';
- }
- if (!size || size === true) {
- size = '40';
- }
- size = (String(size).indexOf('.') >= 0) ? size : size+'.png';
- var src = Q.interpolateUrl(icon + '/' + size);
- return src.isUrl() || icon.substr(0, 2) == '{{'
- ? src
- : Q.url('{{Streams}}/img/icons/'+src);
- };
-
- Streams.invitedUrl = function _Streams_invitedUrl(token) {
- return Q.url(Q.Config.get(['Streams', 'invites', 'baseUrl'], "i"))
- + "/" + token;
- };
-
- Streams.invitationsPath = function _Streams_invitationsPath(userId) {
- var subpath = Q.Config.get(
- 'Streams', 'invites', 'subpath',
- '{{app}}/uploads/Streams/invitations'
- );
- return Q.app.FILES_DIR + '/' + subpath.interpolate({
- app: Q.Config.expect(['Q', 'app'])
- }) + '/' + Q.Utils.splitId(userId);
- };
- /**
- * Use this to check whether variable is a Q.Streams.Stream object
- * @static
- * @method isStream
- * @param {mixed} testing
- * @return {boolean}
- */
- Streams.isStream = function (testing) {
- return Q.typeOf(testing) === "Q.Streams.Stream";
- };
-
- /**
- * @property _messageHandlers
- * @type object
- * @private
- */
- var _messageHandlers = {};
- /**
- * @property _streams
- * @type object
- * @private
- */
- var _streams = {};
-
- /* * * */
-
-