Show:

File: platform/plugins/Streams/classes/Streams/Subscription.js

/**
 * Class representing subscription rows.
 *
 * @module Streams
 */
var Q = require('Q');
var Db = Q.require('Db');
var Streams = Q.require('Streams');

/**
 * Class representing 'Subscription' rows in the 'Streams' database
 * <br/>{"type": [ array of message types ], "notifications": 5}
 * @namespace Streams
 * @class Subscription
 * @extends Base.Streams.Subscription
 * @constructor
 * @param fields {object} The fields values to initialize table row as
 * an associative array of `{column: value}` pairs
 */
function Streams_Subscription (fields) {

	// Run constructors of mixed in objects
	Streams_Subscription.constructors.apply(this, arguments);

	/*
	 * Add any other methods to the model class by assigning them to this.
	 
	 * * * */
}

Q.mixin(Streams_Subscription, Q.require('Base/Streams/Subscription'));

/**
 * Test message according to filters set up for the user and generate array of subscription rules
 * @method test
 * @static
 * @param {String} userId
 * @param {String} publisherId
 * @param {Q.Streams.Stream} stream
 * @param {Function} callback First argument is any possible error, second is array of delivery methods
 */
Streams_Subscription.test = function _Subscription_test(userId, stream, msgType, callback) {
	if (!callback) return;
	(new Streams.Subscription({
		ofUserId: userId,
		publisherId: stream.fields.publisherId,
		streamName: stream.fields.name
	})).retrieve(function(err, sub) {
		if (err) return callback(err);
		if (!sub.length) return callback(null, []); // no active subscriptions
		sub = sub[0];
		if (sub.fields.untilTime && sub.fields.untilTime > new Date()) return callback(null, []); // date passed
		var filter;
		try {
			if (sub.fields.filter) {
				filter = JSON.parse(sub.fields.filter);
			} else {
				filter = Stream.getConfigField(
					stream.fields.type, 
					['subscriptions', 'filter'],
					{ 
						types: [
							"^(?!(Users/)|(Streams/)).*/", 
							"Streams/relatedTo", 
							"Streams/chat/message"
						],
						notifications: 0
					}
				);
			}
		} catch (err) {
			return callback(err);
		}
		var types = filter.types;
		var matched = false;
		for (var i=0, l=types.length; i<l; ++i) {
			if (msgType.match(types[i])) {
				matched = true;
				break;
			}
		}
		var notifications = filter.notifications;
		if (!matched) {
			return callback(null, []); // not subscribed to this message type
		}
		Streams.SubscriptionRule.SELECT('*').where({
			ofUserId: userId,
			publisherId: stream.fields.publisherId,
			streamName: stream.fields.name
		}).execute(function(err, rules) {
			if (err) return callback(err);
			var waitFor = rules.map(function(r){ return r.fields.ordinal; });
			var p = new Q.Pipe(waitFor, 1, function (params) {
				var deliveries = [], ordinal, param;
				for (ordinal in params) {
					param = params[ordinal];
					if (param[0]) {
						return callback(param[0]);
					}
					if (param[1]) {
						deliveries.push(param[1]);
					}
				}
				callback(null, deliveries);
			});
			p.run();
			rules.forEach(function (rule) {
				var o = rule.fields.ordinal;
				var readyTime = rule.fields.readyTime;
				var filter;
				try {
					filter = rule.fields.filter ? JSON.parse(rule.fields.filter) : {};
				} catch (e) {
					return p.fill(o)(e);
				}
				var types = filter.types;
				function _checkNotifications() {
					if (!notifications) {
						return _checkDelivery();
					}
					// get last disconnection time
					Streams.Message.SELECT('publisherId, streamName, type, sentTime')
					.where({
						publisherId: userId,
						streamName: 'Streams/participating',
						type: 'Streams/disconnected'
					}).orderBy('sentTime', false)
					.limit(1)
					.execute(function(err, res) {
						if (err) {
							return p.fill(o)(err);
						}
						// NOTE: all Streams/participating for a given stream must be on the same shard
						var timeOnline = res.length
							? res.reduce(function(pv, cv) {
								return pv > cv ? pv : cv;
							}, res[0].sentTime)
							: (readyTime ? readyTime : 0);
						// now check notifications since timeOnline
						Streams.Notification.SELECT('COUNT(1) as count').where({
							userId: userId,
							"insertedTime >": timeOnline,
							publisherId: stream.fields.publisherId,
							streamName: stream.fields.streamName,
							type: msgType
						}).execute(function (err, res) {
							if (err) return p.fill(o)(err);
							// to support counting in shards
							var count = res.reduce(function(pv, cv) { 
								return pv + Number(cv.count); 
							}, 0);
							if (count < notifications) {
								_checkDelivery();
							} else {
								p.fill(o)();
							}
						}, {plain: true});
					}, { plain: true });
				}
				function _checkDelivery() {
					var deliver;
					try {
						deliver = rule.fields.deliver ? JSON.parse(rule.fields.deliver) : null;
					} catch (e) {
						p.fill(o)(e);
					}
					p.fill(o)(null, deliver);
				}
				var notFound = (
					types && Q.typeOf(types) === 'array'
					&& types.length && types.indexOf(msgType) < 0
				);
				if (notFound || (Date.fromTimestamp(readyTime) > new Date())) {
					// type and readyTime filters not passed
					return p.fill(o)();
				}					
				var labels = filter.labels;
				if (!labels || Q.typeOf(labels) !== "array" || !labels.length) {
					return _checkNotifications();
				}
				Users.Contact.SELECT('*').where({
					userId: userId,
					contactUserId: publisherId,
					label: labels
				}).execute(function (err, contacts) {
					if (err) {
						return p.fill(o)(err);
					}
					if (!contacts.length) {
						return p.fill(o)();
					}
					_checkNotifications();
				});
			});
		});
	});

	/* * * */
}

/**
 * The setUp() method is called the first time
 * an object of this class is constructed.
 * @method setUp
 */
Streams_Subscription.prototype.setUp = function () {
	// put any code here
};

module.exports = Streams_Subscription;