platform/plugins/Streams/classes/Streams.js - Q Platform
Show:

File: platform/plugins/Streams/classes/Streams.js

  1. "use strict";
  2. /*jshint node:true */
  3. /**
  4. * Streams model
  5. * @module Streams
  6. * @main Streams
  7. */
  8. var Q = require('Q');
  9. var fs = require('fs');
  10.  
  11. /**
  12. * Static methods for the Streams model
  13. * @class Streams
  14. * @extends Base.Streams
  15. * @static
  16. */
  17. function Streams() { }
  18. module.exports = Streams;
  19.  
  20. var Base_Streams = require('Base/Streams');
  21. Q.mixin(Streams, Base_Streams);
  22.  
  23.  
  24. /*
  25. * This is where you would place all the static methods for the models,
  26. * the ones that don't strongly pertain to a particular row or table.
  27. * Just assign them as methods of the Streams object.
  28. * * * */
  29.  
  30. if (!Q.plugins.Users) {
  31. throw new Q.Exception("Streams: Users plugin is required");
  32. }
  33.  
  34. var util = require('util');
  35. var Db = Q.require('Db');
  36. var Users = Q.plugins.Users;
  37. var socket = null;
  38.  
  39. Q.makeEventEmitter(Streams);
  40.  
  41. /**
  42. * Read levels
  43. * @property READ_LEVEL
  44. * @type Object
  45. */
  46. /**
  47. * Can't see the stream
  48. * @property READ_LEVEL.none
  49. * @type integer
  50. * @default 0
  51. * @final
  52. */
  53. /**
  54. * Can see icon and title
  55. * @property READ_LEVEL.see
  56. * @type integer
  57. * @default 10
  58. * @final
  59. */
  60. /**
  61. * Can see the stream's content
  62. * @property READ_LEVEL.content
  63. * @type integer
  64. * @default 20
  65. * @final
  66. */
  67. /**
  68. * Can see relations to other streams
  69. * @property READ_LEVEL.relations
  70. * @type integer
  71. * @default 25
  72. * @final
  73. */
  74. /**
  75. * Can see participants in the stream
  76. * @property READ_LEVEL.participants
  77. * @type integer
  78. * @default 30
  79. * @final
  80. */
  81. /**
  82. * Can play stream in a player
  83. * @property READ_LEVEL.messages
  84. * @type integer
  85. * @default 40
  86. * @final
  87. */
  88. /**
  89. * Max read level
  90. * @property READ_LEVEL.max
  91. * @type integer
  92. * @default 40
  93. * @final
  94. */
  95. Streams.READ_LEVEL = {
  96. 'none': 0, // can't see the stream
  97. 'see': 10, // can see icon and title
  98. 'content': 20, // can preview stream and its content
  99. 'relations': 25, // can see relations to other streams
  100. 'participants': 30, // can see participants in the stream
  101. 'messages': 40, // can play stream in a player
  102. 'max': 40 // max read level
  103. };
  104. /**
  105. * Write levels
  106. * @property WRITE_LEVEL
  107. * @type Object
  108. */
  109. /**
  110. * Cannot affect stream or participants list
  111. * @property WRITE_LEVEL.none
  112. * @type integer
  113. * @default 0
  114. * @final
  115. */
  116. /**
  117. * Can become a participant, chat, and leave
  118. * @property WRITE_LEVEL.join
  119. * @type integer
  120. * @default 10
  121. * @final
  122. */
  123. /**
  124. * Can vote for a relation message posted to the stream.
  125. * @property WRITE_LEVEL.vote
  126. * @type integer
  127. * @default 13
  128. * @final
  129. */
  130. /**
  131. * Can post messages, but manager must approve
  132. * @property WRITE_LEVEL.postPending
  133. * @type integer
  134. * @default 15
  135. * @final
  136. */
  137. /**
  138. * Can post messages which appear immediately
  139. * @property WRITE_LEVEL.messages
  140. * @type integer
  141. * @default 20
  142. * @final
  143. */
  144. /**
  145. * Can post messages relating other streams to this one
  146. * @property WRITE_LEVEL.relate
  147. * @type integer
  148. * @default 23
  149. * @final
  150. */
  151. /**
  152. * Can update properties of relations directly
  153. * @property WRITE_LEVEL.relations
  154. * @type integer
  155. * @default 25
  156. * @final
  157. */
  158. /**
  159. * Can post messages requesting edits of stream
  160. * @property WRITE_LEVEL.suggest
  161. * @type integer
  162. * @default 28
  163. * @final
  164. */
  165. /**
  166. * Can post messages to edit stream content immediately
  167. * @property WRITE_LEVEL.edit
  168. * @type integer
  169. * @default 30
  170. * @final
  171. */
  172. /**
  173. * Can post a message requesting to close the stream
  174. * @property WRITE_LEVEL.closePending
  175. * @type integer
  176. * @default 35
  177. * @final
  178. */
  179. /**
  180. * Don't delete, just prevent any new changes to stream
  181. * however, joining and leaving is still ok
  182. * @property WRITE_LEVEL.close
  183. * @type integer
  184. * @default 40
  185. * @final
  186. */
  187. /**
  188. * Max write level
  189. * @property WRITE_LEVEL.max
  190. * @type integer
  191. * @default 40
  192. * @final
  193. */
  194. Streams.WRITE_LEVEL = {
  195. 'none': 0, // cannot affect stream or participants list
  196. 'join': 10, // can become a participant, chat, and leave
  197. 'vote': 13, // can vote for a relation message posted to the stream
  198. 'postPending': 18, // can post messages which require manager's approval
  199. 'post': 20, // can post messages which take effect immediately
  200. 'relate': 23, // can relate other streams to this one
  201. 'relations': 25, // can update properties of relations directly
  202. 'suggest': 28, // can suggest edits of stream
  203. 'edit': 30, // can edit stream content immediately
  204. 'closePending': 35, // can post a message requesting to close the stream
  205. 'close': 40, // don't delete, just prevent any new changes to stream
  206. // however, joining and leaving is still ok
  207. 'max': 40 // max write level
  208. };
  209. /**
  210. * Admin levels
  211. * @property ADMIN_LEVEL
  212. * @type Object
  213. */
  214. /**
  215. * Cannot do anything related to admin / users
  216. * @property ADMIN_LEVEL.none
  217. * @type integer
  218. * @default 0
  219. * @final
  220. */
  221. /**
  222. * Can prove things about the stream's content or participants
  223. * @property ADMIN_LEVEL.tell
  224. * @type integer
  225. * @default 10
  226. * @final
  227. */
  228. /**
  229. * Able to create invitations for others, granting access
  230. * and permissions up to what they themselves have
  231. * @property ADMIN_LEVEL.invite
  232. * @type integer
  233. * @default 20
  234. * @final
  235. */
  236. /**
  237. * Can approve posts, and give people any adminLevel < 'manage'
  238. * @property ADMIN_LEVEL.manage
  239. * @type integer
  240. * @default 30
  241. * @final
  242. */
  243. /**
  244. * Can give people any adminLevel <= 'own'
  245. * @property ADMIN_LEVEL.own
  246. * @type integer
  247. * @default 40
  248. * @final
  249. */
  250. /**
  251. * Max admin level
  252. * @property ADMIN_LEVEL.max
  253. * @type integer
  254. * @default 40
  255. * @final
  256. */
  257. Streams.ADMIN_LEVEL = {
  258. 'none': 0, // cannot do anything related to admin / users
  259. 'tell': 10, // can prove things about the stream's content or participants
  260. 'invite': 20, // able to create invitations for others, granting access
  261. 'manage': 30, // can approve posts and give people any adminLevel < 30
  262. 'own': 40, // can give people any adminLevel <= 40
  263. 'max': 40 // max admin level
  264. };
  265. /**
  266. * Access sources
  267. * @property ACCESS_SOURCES
  268. * @type object
  269. */
  270. /**
  271. * Public access
  272. * @config ACCESS_SOURCES['public']
  273. * @type integer
  274. * @default 0
  275. * @final
  276. */
  277. /**
  278. * From contact
  279. * @config ACCESS_SOURCES['contact']
  280. * @type integer
  281. * @default 1
  282. * @final
  283. */
  284. /**
  285. * Direct access
  286. * @config ACCESS_SOURCES['direct']
  287. * @type integer
  288. * @default 2
  289. * @final
  290. */
  291. /**
  292. * Inherited public access
  293. * @config ACCESS_SOURCES['inherited_public']
  294. * @type integer
  295. * @default 3
  296. * @final
  297. */
  298. /**
  299. * Inherited from contact
  300. * @config ACCESS_SOURCES['inherited_contact']
  301. * @type integer
  302. * @default 4
  303. * @final
  304. */
  305. /**
  306. * Inherited direct access
  307. * @config ACCESS_SOURCES['inherited_direct']
  308. * @type integer
  309. * @default 5
  310. * @final
  311. */
  312. Streams.ACCESS_SOURCES = {
  313. 'public': 0,
  314. 'contact': 1,
  315. 'direct': 2,
  316. 'inherited_public': 3,
  317. 'inherited_contact': 4,
  318. 'inherited_direct': 5
  319. };
  320.  
  321. Streams.defined = {};
  322.  
  323. /**
  324. * Call this function to set a constructor for a stream type
  325. * @static
  326. * @method define
  327. * @param {String} type The type of the message, e.g. "Streams/chat/message"
  328. * @param {String|Function} ctor Your message's constructor, or path to a javascript file which will define it
  329. * @param {Object} methods An optional hash of methods. You can also override methods of the
  330. * Stream object, such as "url".
  331. */
  332. Streams.define = function (type, ctor, methods) {
  333. if (typeof type === 'object') {
  334. for (var t in type) {
  335. Streams.define(t, type[t]);
  336. }
  337. return;
  338. };
  339. type = Q.normalize(type);
  340. if (typeof ctor !== 'function') {
  341. throw new Q.Error("Streams.Stream.define requires ctor to be a function");
  342. }
  343. function CustomStreamConstructor() {
  344. CustomStreamConstructor.constructors.apply(this, arguments);
  345. ctor.apply(this, arguments);
  346. }
  347. Q.mixin(CustomStreamConstructor, Streams.Stream);
  348. Q.extend(CustomStreamConstructor.prototype, methods);
  349. return Streams.defined[type] = CustomStreamConstructor;
  350. };
  351.  
  352. /**
  353. * Start internal listener for Streams plugin. Accepts messages such as<br/>
  354. * "Streams/Stream/join",
  355. * "Streams/Stream/leave",
  356. * "Streams/Stream/create",
  357. * "Streams/Stream/remove",
  358. * "Streams/Message/post",
  359. * "Streams/Message/postMessages",
  360. * "Streams/Stream/invite"
  361. * @method listen
  362. * @static
  363. * @param {Object} options={} So far no options are implemented.
  364. * @return {Boolean} Whether the server has started
  365. */
  366. Streams.listen = function (options) {
  367.  
  368. // Start internal server
  369. var server = Q.listen();
  370. server.attached.express.post('/Q/node', Streams_request_handler);
  371.  
  372. // Start external socket server
  373. var node = Q.Config.get(['Q', 'node']);
  374. if (!node) {
  375. return false;
  376. }
  377. var pubHost = Q.Config.get(['Streams', 'node', 'host'], Q.Config.get(['Q', 'node', 'host'], null));
  378. var pubPort = Q.Config.get(['Streams', 'node', 'port'], Q.Config.get(['Q', 'node', 'port'], null));
  379.  
  380. if (pubHost === null) {
  381. throw new Q.Exception("Streams: Missing config field: Streams/node/host");
  382. }
  383. if (pubPort === null) {
  384. throw new Q.Exception("Streams: Missing config field: Streams/node/port");
  385. }
  386.  
  387. // Handle messages being posted to streams
  388. Streams.Stream.on('post', function (stream, byUserId, msg, clientId) {
  389. if (!stream) {
  390. return console.error("Streams.Stream.on POST: invalid stream!!!");
  391. }
  392.  
  393. if (_messageHandlers[msg.fields.type]) {
  394. _messageHandlers[msg.fields.type].call(this, msg);
  395. }
  396.  
  397. Streams.Stream.emit('post/'+msg.fields.type, stream, byUserId, msg);
  398. stream.notifyParticipants('Streams/post', byUserId, msg);
  399. });
  400.  
  401. /**
  402. * @property socketServer
  403. * @type {SocketNamespace}
  404. * @private
  405. */
  406. socket = Users.Socket.listen({
  407. host: pubHost,
  408. port: pubPort,
  409. https: Q.Config.get(['Q', 'node', 'https'], false) || {},
  410. });
  411.  
  412. socket.io.of('/Users').on('connection', function(client) {
  413. Q.log("Socket.IO client connected " + client.id);
  414. if (client.alreadyListeningStreams) {
  415. return;
  416. }
  417. client.alreadyListeningStreams = true;
  418. client.on('Streams/observe',
  419. function (sessionId, clientId, publisherId, streamName, fn) {
  420. if (!_validateSessionId(sessionId, fn)) {
  421. return;
  422. }
  423. if (typeof publisherId !== 'string'
  424. || typeof streamName !== 'string') {
  425. return fn && fn({
  426. type: 'Streams.Exception.BadArguments',
  427. message: 'Bad arguments'
  428. });
  429. }
  430. var observer = Q.getObject(
  431. [publisherId, streamName, client.id], Streams.observers
  432. );
  433. if (observer) {
  434. return fn && fn(null, true);
  435. }
  436. Streams.fetchOne('', publisherId, streamName, function (err, stream) {
  437. if (err || !stream) {
  438. return fn && fn({
  439. type: 'Users.Exception.NotAuthorized',
  440. message: 'not authorized'
  441. });
  442. }
  443. stream.testReadLevel('messages', function (err, allowed) {
  444. if (err || !allowed) {
  445. return fn && fn({
  446. type: 'Users.Exception.NotAuthorized',
  447. message: 'not authorized'
  448. });
  449. }
  450. var clients = Q.getObject(
  451. [publisherId, streamName], Streams.observers, null, {}
  452. );
  453. var max = Streams.Stream.getConfigField(
  454. stream.fields.type,
  455. 'observersMax'
  456. );
  457. if (max && Object.keys(clients).length >= max - 1) {
  458. return fn && fn({
  459. type: 'Streams.Exception.TooManyObservers',
  460. message: 'too many observers already'
  461. });
  462. }
  463. clients[client.id] = client;
  464. Q.setObject(
  465. [client.id, publisherId, streamName], true, Streams.observing
  466. );
  467. return fn && fn(null, true);
  468. });
  469. });
  470. });
  471. client.on('Streams/neglect',
  472. function (sessionId, clientId, publisherId, streamName, fn) {
  473. if (!_validateSessionId(sessionId, fn)) {
  474. return;
  475. }
  476. var o = Streams.observers;
  477. if (!Q.getObject([publisherId, streamName, client.id], o)) {
  478. return fn && fn(null, false);
  479. }
  480. delete o[publisherId][streamName][client.id];
  481. delete Streams.observing[client.id][publisherId][streamName];
  482. return fn && fn(null, true);
  483. });
  484. client.on('disconnect', function () {
  485. var observing = Streams.observing[client.id];
  486. if (!observing) {
  487. return;
  488. }
  489. for (var publisherId in observing) {
  490. var p = observing[publisherId];
  491. for (var streamName in p) {
  492. delete Streams.observers[publisherId][streamName][client.id];
  493. }
  494. }
  495. delete Streams.observing[client.id];
  496. });
  497. });
  498. return true;
  499. };
  500.  
  501. /**
  502. * Stores socket.io clients observing streams
  503. * @property clients
  504. * @type {Object}
  505. */
  506. Streams.observers = {};
  507.  
  508. /**
  509. * Stores streams that socket.io clients are observing
  510. * @property clients
  511. * @type {Object}
  512. */
  513. Streams.observing = {};
  514.  
  515. function _validateSessionId(sessionId, fn) {
  516. // Validate sessionId to make sure we generated it
  517. var result = Users.Session.decodeId(sessionId);
  518. if (result[0]) {
  519. return true;
  520. }
  521. fn && fn({
  522. type: 'Users.Exception.BadSessionId',
  523. message: 'bad session id'
  524. });
  525. return false;
  526. }
  527.  
  528. function Streams_request_handler (req, res, next) {
  529. var parsed = req.body;
  530. if (!parsed || !parsed['Q/method']) {
  531. return next();
  532. }
  533. var participant, stream, msg, posted, streams, deviceId, title, k;
  534. var userIds, invitingUserId, username, appUrl, parts, rest, label, myLabel;
  535. var readLevel, writeLevel, adminLevel, permissions, displayName, expireTime, logKey;
  536. var clientId = parsed["Q.clientId"];
  537. var stream = parsed.stream
  538. && Streams.Stream.construct(JSON.parse(parsed.stream), true);
  539. switch (parsed['Q/method']) {
  540. case 'Users/device':
  541. break;
  542. case 'Users/logout':
  543. var userId = parsed.userId;
  544. var sessionId = parsed.sessionId;
  545. if (userId && sessionId) {
  546. var clients = Users.clients[userId];
  547. for (var cid in clients) {
  548. if (clients[cid].sessionId === sessionId) {
  549. clients[cid].disconnect();
  550. }
  551. }
  552. }
  553. Users.pushNotifications(userId, {
  554. badge: 0
  555. });
  556. break;
  557. case 'Streams/Stream/join':
  558. participant = new Streams.Participant(JSON.parse(parsed.participant));
  559. participant.fillMagicFields();
  560. userId = participant.fields.userId;
  561. if (Q.Config.get(['Streams', 'logging'], false)) {
  562. Q.log('Streams.listen: Streams/Stream/join {'
  563. + '"publisherId": "' + stream.fields.publisherId
  564. + '", "name": "' + stream.fields.name
  565. + '"}'
  566. );
  567. }
  568. // invalidate cache for this stream
  569. // Streams.getParticipants.forget(stream.fields.publisherId, stream.fields.name);
  570. // inform user's clients about change
  571. Users.Socket.emitToUser(userId, 'Streams/join', participant);
  572. Streams.Stream.emit('join', stream, userId, clientId);
  573. break;
  574. case 'Streams/Stream/visit':
  575. participant = JSON.parse(parsed.participant);
  576. userId = participant.userId;
  577. Streams.Stream.emit('visit', stream, userId, clientId);
  578. break;
  579. case 'Streams/Stream/leave':
  580. participant = new Streams.Participant(JSON.parse(parsed.participant));
  581. participant.fillMagicFields();
  582. userId = participant.fields.userId;
  583. if (Q.Config.get(['Streams', 'logging'], false)) {
  584. Q.log('Streams.listen: Streams/Stream/leave {'
  585. + '"publisherId": "' + stream.fields.publisherId
  586. + '", "name": "' + stream.fields.name
  587. + '"}'
  588. );
  589. }
  590. // invalidate cache for this stream
  591. // Streams.getParticipants.forget(stream.fields.publisherId, stream.fields.name);
  592. // inform user's clients about change
  593. Users.Socket.emitToUser(userId, 'Streams/leave', participant);
  594. Streams.Stream.emit('leave', stream, userId, clientId);
  595. break;
  596. case 'Streams/Stream/remove':
  597. if (Q.Config.get(['Streams', 'logging'], false)) {
  598. Q.log('Streams.listen: Streams/Stream/remove {'
  599. + '"publisherId": "' + stream.fields.publisherId
  600. + '", "name": "' + stream.fields.name
  601. + '"}'
  602. );
  603. }
  604. // invalidate cache
  605. stream.notifyParticipants('Streams/remove', null, {
  606. publisherId: stream.fields.publisherId,
  607. name: stream.fields.name
  608. });
  609. Streams.Stream.emit('remove', stream, clientId);
  610. break;
  611. case 'Streams/Stream/create':
  612. if (Q.Config.get(['Streams', 'logging'], false)) {
  613. Q.log('Streams.listen: Streams/Stream/create {'
  614. + '"publisherId": "' + stream.fields.publisherId
  615. + '", "name": "' + stream.fields.name
  616. + '"}'
  617. );
  618. }
  619. Streams.Stream.emit('create', stream, clientId);
  620. // no need to notify anyone
  621. break;
  622. case 'Streams/Message/post':
  623. msg = Streams.Message.construct(JSON.parse(parsed.message), true);
  624. msg.fillMagicFields();
  625. if (Q.Config.get(['Streams', 'logging'], false)) {
  626. Q.log('Streams.listen: Streams/Message/post {'
  627. + '"publisherId": "' + stream.fields.publisherId
  628. + '", "name": "' + stream.fields.name
  629. + '", "msg.type": "' + msg.fields.type
  630. + '"}'
  631. );
  632. }
  633. Streams.Stream.emit('post', stream, msg.fields.byUserId, msg, clientId);
  634. break;
  635. case 'Streams/Message/postMessages':
  636. posted = JSON.parse(parsed.posted);
  637. streams = parsed.streams && JSON.parse(parsed.streams);
  638. if (!streams) break;
  639. for (k in posted) {
  640. msg = Streams.Message.construct(posted[k], true);
  641. msg.fillMagicFields();
  642. stream = Streams.Stream.construct(
  643. streams[msg.fields.publisherId][msg.fields.streamName], true
  644. );
  645. if (Q.Config.get(['Streams', 'logging'], false)) {
  646. Q.log('Streams.listen: Streams/Message/post {'
  647. + '"publisherId": "' + stream.fields.publisherId
  648. + '", "name": "' + stream.fields.name
  649. + '", "msg.type": "' + msg.fields.type
  650. + '"}'
  651. );
  652. }
  653. Streams.Stream.emit('post', stream, msg.fields.byUserId, msg, clientId);
  654. }
  655. break;
  656. case 'Streams/Stream/invite':
  657. try {
  658. userIds = JSON.parse(parsed.userIds);
  659. invitingUserId = parsed.invitingUserId;
  660. username = parsed.username;
  661. appUrl = parsed.appUrl;
  662. readLevel = parsed.readLevel || null;
  663. writeLevel = parsed.writeLevel || null;
  664. adminLevel = parsed.adminLevel || null;
  665. permissions = parsed.permissions || null;
  666. displayName = parsed.displayName || '';
  667. expireTime = parsed.expireTime ? new Date(parsed.expireTime*1000) : null;
  668. } catch (e) {
  669. return res.send({data: false});
  670. }
  671. res.send({data: true});
  672. if (logKey = Q.Config.get(['Streams', 'logging'], false)) {
  673. Q.log(
  674. 'Streams.listen: Streams/Stream/invite {'
  675. + '"publisherId": "' + stream.fields.publisherId
  676. + '", "name": "' + stream.fields.name
  677. + '", "userIds": ' + parsed.userIds
  678. + '}',
  679. logKey
  680. );
  681. }
  682.  
  683. if (expireTime && expireTime <= new Date()) {
  684. break;
  685. }
  686. persist();
  687. return;
  688. default:
  689. break;
  690. }
  691. return next();
  692. function persist () {
  693. Q.each(userIds, function (i, userId) {
  694. var token = null;
  695. // TODO: Change this to a getter, so that we can do throttling in case there are too many userIds
  696. (new Streams.Participant({
  697. "publisherId": stream.fields.publisherId,
  698. "streamName": stream.fields.name,
  699. "userId": userId,
  700. "state": "participating"
  701. })).retrieve(_participant);
  702. function _participant(err, rows) {
  703. if (rows && rows.length) {
  704. // User is already a participant in the stream.
  705. return;
  706. }
  707. (new Streams.Invite({
  708. "userId": userId,
  709. "state": "pending",
  710. "publisherId": stream.fields.publisherId,
  711. "streamName": stream.fields.name,
  712. "invitingUserId": invitingUserId,
  713. "displayName": displayName,
  714. "appUrl": appUrl,
  715. "readLevel": readLevel,
  716. "writeLevel": writeLevel,
  717. "adminLevel": adminLevel,
  718. "permissions": permissions,
  719. "expireTime": expireTime
  720. })).save(_inviteSaved);
  721. }
  722. var invite;
  723.  
  724. function _inviteSaved(err) {
  725. if (err) {
  726. Q.log("ERROR: Failed to save Streams.Invite for user '"+userId+"' during invite");
  727. Q.log(err);
  728. return;
  729. }
  730. token = this.fields.token;
  731. invite = this;
  732. // now ready to save Streams.Invited row
  733. (new Streams.Invited({
  734. "token": token,
  735. "userId": userId,
  736. "state": "pending",
  737. "expireTime": expireTime
  738. })).save(_invitedSaved);
  739. }
  740.  
  741. function _invitedSaved(err) {
  742. if (err) {
  743. Q.log("ERROR: Failed to save Streams.Invited for user '"+userId+"' during invite");
  744. Q.log(err);
  745. return;
  746. }
  747. (new Streams.Participant({
  748. "publisherId": stream.fields.publisherId,
  749. "streamName": stream.fields.name,
  750. "streamType": stream.fields.type,
  751. "userId": userId,
  752. "state": "invited",
  753. "reason": ""
  754. })).save(true, _participantSaved);
  755. // Write some files, if requested
  756. // SECURITY: Here we trust the input, which should only be sent internally
  757. if (parsed.template) {
  758. new Users.User({id: userId})
  759. .retrieve(function () {
  760. var fields = Q.extend({}, parsed, {
  761. stream: stream,
  762. user: this,
  763. invite: invite,
  764. link: invite.url(),
  765. app: Q.app,
  766. communityId: Users.communityId(),
  767. communityName: Users.communityName(),
  768. appRootUrl: Q.Config.expect(['Q', 'web', 'appRootUrl'])
  769. });
  770. var html = Q.Handlebars.render(parsed.template, fields);
  771. var path = Streams.invitationsPath(invitingUserId)
  772. +'/'+parsed.batchName;
  773. var filename = path + '/'
  774. + Q.normalize(stream.fields.publisherId) + '-'
  775. + Q.normalize(stream.fields.name) + '-'
  776. + this.fields.id + '.html';
  777. fs.writeFile(filename, html, function (err) {
  778. if (err) {
  779. Q.log(err);
  780. }
  781. });
  782. });
  783. }
  784. }
  785.  
  786. function _participantSaved(err) {
  787. if (err) {
  788. Q.log("ERROR: Failed to save Streams.Participant for user '"+userId+"' during invite");
  789. Q.log(err);
  790. return;
  791. }
  792.  
  793. // Now post a message to Streams/invited stream
  794. Streams.fetchOne(invitingUserId, userId, 'Streams/invited', _stream);
  795. }
  796.  
  797. function _stream(err, invited) {
  798. if (err) {
  799. Q.log("ERROR: Failed to get invited stream for user '"+userId+"' during invite");
  800. Q.log(err);
  801. return;
  802. }
  803. Streams.Stream.emit('invite', invited.getFields(), userId, stream);
  804. if (!invited.testWriteLevel('post')) {
  805. Q.log("ERROR: Not authorized to post to invited stream for user '"+userId+"' during invite");
  806. return;
  807. }
  808. var invitedUrl = Streams.invitedUrl(token);
  809. displayName = displayName || "Someone";
  810. var msg = {
  811. publisherId: invited.fields.publisherId,
  812. streamName: invited.fields.name,
  813. byUserId: invitingUserId,
  814. type: 'Streams/invite',
  815. sentTime: new Db.Expression("CURRENT_TIMESTAMP"),
  816. state: 'posted',
  817. content: displayName + " invited you to " + invitedUrl,
  818. instructions: JSON.stringify({
  819. token: token,
  820. displayName: displayName,
  821. appUrl: appUrl,
  822. invitedUrl: invitedUrl,
  823. type: stream.fields.type,
  824. title: stream.fields.title,
  825. content: stream.fields.content
  826. })
  827. };
  828. invited.post(msg, function (err) {
  829. if (err) {
  830. Q.log("ERROR: Failed to save message for user '"+userId+"' during invite");
  831. Q.log(err);
  832. }
  833. });
  834. }
  835. });
  836.  
  837. }
  838. }
  839.  
  840. // Connection from socket.io
  841. Users.on('connected', function(client, wasOnline) {
  842. if (!wasOnline) {
  843. // post "connected" message to Streams/participating stream
  844. new Streams.Stream({
  845. publisherId: client.userId,
  846. name: 'Streams/participating'
  847. }).post(client.userId, {
  848. type: 'Streams/connected'
  849. }, function(err) {
  850. if (err) console.error(err);
  851. });
  852. }
  853. });
  854.  
  855. Users.on('disconnected', function (userId) {
  856. // post "disconnected" message to Streams/participating stream
  857. new Streams.Stream({
  858. publisherId: userId,
  859. name: 'Streams/participating'
  860. }).post({
  861. byUserId: userId,
  862. type: 'Streams/disconnected'
  863. }, function(err) {
  864. if (err) console.error(err);
  865. Q.log('User disconnected: ' + userId);
  866. });
  867. });
  868.  
  869. /**
  870. * Retrieve stream participants
  871. * @method getParticipants
  872. * @static
  873. * @param {String} publisherId The publisher Id
  874. * @param {String} streamName The name of the stream
  875. * @param {Function} [callback=null] Callback receives a map of {userId: participant} pairs
  876. */
  877. Streams.getParticipants = function(publisherId, streamName, callback) {
  878. var args = arguments;
  879. if (!callback) return;
  880. Streams.Participant.SELECT('*').where({
  881. publisherId: publisherId,
  882. streamName: streamName
  883. }).execute(function (err, rows) {
  884. if (err) {
  885. Q.log(err);
  886. // Streams.getParticipants.forget(publisherId, streamName);
  887. callback({});
  888. } else {
  889. var result = {};
  890. for (var i=0; i<rows.length; ++i) {
  891. result [ rows[i].fields.userId ] = rows[i];
  892. }
  893. callback(result);
  894. }
  895. });
  896. };
  897.  
  898. /**
  899. * Retrieve socket.io clients registered to observe the stream
  900. * by sending "Streams/join" events through the socket.
  901. * @method getObservers
  902. * @static
  903. * @param {String} publisherId The publisher Id
  904. * @param {String} streamName The name of the stream
  905. * @param {Function} [callback=null] Callback receives a map of {clientId: socketClient} pairs
  906. */
  907. Streams.getObservers = function(publisherId, streamName, callback) {
  908. var observers = Q.getObject([publisherId, streamName], Streams.observers);
  909. callback && callback(observers || {});
  910. };
  911.  
  912. /**
  913. * Retrieve stream with calculated access rights
  914. * @method fetch
  915. * @static
  916. * @param {String} asUserId
  917. * The user id to calculate access rights
  918. * @param {String} publisherId
  919. * The publisher Id
  920. * @param {String|Array|Db.Range} streamName
  921. * The name of the stream, or an array of names, or a Db.Range
  922. * @param callback=null {function}
  923. * Callback receives (err, streams) as parameters
  924. * @param {String} [fields='*']
  925. * Comma delimited list of fields to retrieve in the stream.
  926. * Must include at least "publisherId" and "name".
  927. * since make up the primary key of the stream table.
  928. * You can skip this argument if you want.
  929. * @param {Object} [options={}]
  930. * Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
  931. * @see Db_Query_Mysql::options().
  932. */
  933. Streams.fetch = function (asUserId, publisherId, streamName, callback, fields, options) {
  934. if (!callback) return;
  935. if (!publisherId || !streamName) {
  936. return callback(new Error("Wrong arguments"));
  937. }
  938. if (typeof streamName.charAt === 'function'
  939. && streamName.charAt(streamName.length-1) === '/') {
  940. streamName = new Db.Range(streamName, true, false, streamName.slice(0, -1)+'0');
  941. }
  942. if (Q.isPlainObject(fields)) {
  943. options = fields;
  944. fields = '*';
  945. }
  946. fields = fields || '*';
  947. Streams.Stream.SELECT(fields)
  948. .where({publisherId: publisherId, name: streamName})
  949. .options(options)
  950. .execute(function(err, res) {
  951. if (err) {
  952. return callback(err);
  953. }
  954. if (!res.length) {
  955. return callback(null, []);
  956. }
  957. var p = new Q.Pipe(res.map(function(a) { return a.fields.name; }),
  958. function(params, subjects) {
  959. for (var name in params) {
  960. if (params[name][0]) {
  961. callback(params[name][0]); // there was an error
  962. return;
  963. }
  964. }
  965. callback(null, subjects); // success
  966. });
  967. for (var i=0; i<res.length; i++) {
  968. res[i].calculateAccess(asUserId, p.fill(res[i].fields.name));
  969. }
  970. });
  971. };
  972.  
  973. /**
  974. * Retrieve stream with calculated access rights
  975. * @method fetchOne
  976. * @static
  977. * @param {String} asUserId
  978. * The user id to calculate access rights
  979. * @param {String} publisherId
  980. * The publisher Id
  981. * @param {String} streamName
  982. * The name of the stream
  983. * @param {Function} [callback=null]
  984. * Callback receives the (err, stream) as parameters
  985. * @param {String} [fields='*']
  986. * Comma delimited list of fields to retrieve in the stream.
  987. * Must include at least "publisherId" and "name".
  988. * since make up the primary key of the stream table.
  989. * You can skip this argument if you want.
  990. * @param {Object} [options={}]
  991. * Provide additional query options like 'limit', 'offset', 'orderBy', 'where' etc.
  992. * @see Db_Query_Mysql::options().
  993. */
  994. Streams.fetchOne = function (asUserId, publisherId, streamName, callback, fields, options) {
  995. if (!callback) return;
  996. if (!publisherId || !streamName
  997. || typeof publisherId !== 'string'
  998. || typeof streamName !== 'string') {
  999. return callback(new Error("Wrong arguments"));
  1000. }
  1001. if (Q.isPlainObject(fields)) {
  1002. options = fields;
  1003. fields = '*';
  1004. }
  1005. Streams.Stream.SELECT('*')
  1006. .where({publisherId: publisherId, name: streamName})
  1007. .options(options)
  1008. .limit(1).execute(function(err, res) {
  1009. if (err) {
  1010. return callback(err);
  1011. }
  1012. if (!res.length) {
  1013. callback(null, null);
  1014. }
  1015. res[0].calculateAccess(asUserId, function () {
  1016. callback.call(res[0], null, res[0]);
  1017. });
  1018. });
  1019. };
  1020.  
  1021. /**
  1022. * Register a message handler
  1023. * @method messageHandler
  1024. * @static
  1025. * @param {String} msgType
  1026. * Type of stream
  1027. * @param {Function} callback
  1028. * The handler for stream messages
  1029. */
  1030. Streams.messageHandler = function(msgType, callback) {
  1031. if (callback === undefined) {
  1032. return _messageHandlers[msgType];
  1033. }
  1034. if (typeof callback !== 'function') {
  1035. throw new Q.Exception("Streams: callback passed to messageHandler is not a function");
  1036. }
  1037. _messageHandlers[msgType] = callback;
  1038. };
  1039.  
  1040. /**
  1041. * Calculate the url of a stream's icon
  1042. * @static
  1043. * @method iconUrl
  1044. * @param {String} icon the value of the stream's "icon" field
  1045. * @param {Number} [size=40] the size of the icon to render. Defaults to 40.
  1046. * @return {String} the url
  1047. */
  1048. Streams.iconUrl = function(icon, size) {
  1049. if (!icon) {
  1050. console.warn("Streams.iconUrl: icon is empty");
  1051. return '';
  1052. }
  1053. if (!size || size === true) {
  1054. size = '40';
  1055. }
  1056. size = (String(size).indexOf('.') >= 0) ? size : size+'.png';
  1057. var src = Q.interpolateUrl(icon + '/' + size);
  1058. return src.isUrl() || icon.substr(0, 2) == '{{'
  1059. ? src
  1060. : Q.url('{{Streams}}/img/icons/'+src);
  1061. };
  1062.  
  1063. Streams.invitedUrl = function _Streams_invitedUrl(token) {
  1064. return Q.url(Q.Config.get(['Streams', 'invites', 'baseUrl'], "i"))
  1065. + "/" + token;
  1066. };
  1067.  
  1068. Streams.invitationsPath = function _Streams_invitationsPath(userId) {
  1069. var subpath = Q.Config.get(
  1070. 'Streams', 'invites', 'subpath',
  1071. '{{app}}/uploads/Streams/invitations'
  1072. );
  1073. return Q.app.FILES_DIR + '/' + subpath.interpolate({
  1074. app: Q.Config.expect(['Q', 'app'])
  1075. }) + '/' + Q.Utils.splitId(userId);
  1076. };
  1077. /**
  1078. * Use this to check whether variable is a Q.Streams.Stream object
  1079. * @static
  1080. * @method isStream
  1081. * @param {mixed} testing
  1082. * @return {boolean}
  1083. */
  1084. Streams.isStream = function (testing) {
  1085. return Q.typeOf(testing) === "Q.Streams.Stream";
  1086. };
  1087.  
  1088. /**
  1089. * @property _messageHandlers
  1090. * @type object
  1091. * @private
  1092. */
  1093. var _messageHandlers = {};
  1094. /**
  1095. * @property _streams
  1096. * @type object
  1097. * @private
  1098. */
  1099. var _streams = {};
  1100.  
  1101. /* * * */
  1102.