Show:

File: platform/plugins/Streams/classes/Streams/Message.js

/**
 * Class representing message rows.
 *
 * @module Streams
 */
var Q = require('Q');
var Db = Q.require('Db');
var Users = Q.require('Users');
var Streams = Q.require('Streams');
var Base_Streams_Message = Q.require('Base/Streams/Message');

Q.makeEventEmitter(Streams_Message);

/**
 * Class representing 'Message' rows in the 'Streams' database
 * <br/>stored primarily on publisherId's fm server
 * @namespace Streams
 * @class Message
 * @extends Base.Streams.Message
 * @constructor
 * @param fields {object} The fields values to initialize table row as
 * an associative array of `{column: value}` pairs
 */
function Streams_Message (fields) {

	// Run constructors of mixed in objects
	Streams_Message.constructors.apply(this, arguments);

	/*
	 * Add any other methods to the model class by assigning them to this.
	 
	 * * * */

	/* * * */
}

Q.mixin(Streams_Message, Base_Streams_Message);

Streams_Message.defined = {};

Streams_Message.construct = function Streams_Message_construct(fields, retrieved) {
	if (Q.isEmpty(fields)) {
		Q.handle(callback, this, ["Streams.Message constructor: fields are missing"]);
		return false;
	}
	if (fields.fields) {
		fields = fields.fields;
	}
	var type = Q.normalize(fields.type);
	var MC = Streams_Message.defined[type];
	if (!MC) {
		MC = Streams_Message.defined[type] = function MessageConstructor(fields) {
			MessageConstructor.constructors.apply(this, arguments);
			// Default constructor. Copy any additional fields.
			if (!fields) return;
			for (var k in fields) {
				this.fields[k] = Q.copy(fields[k]);
				if (this.fields[k] instanceof Buffer) {
					this.fields[k] = this.fields[k].toString();
				}
			}
		};
		Q.mixin(MC, Streams_Message);
	}
	var message = new MC(fields);
	if (retrieved) {
		message.retrieved = true;
		message._fieldsModified = {};
	}
	return message;
};

/**
 * The setUp() method is called the first time
 * an object of this class is constructed.
 * @method setUp
 */
Streams_Message.prototype.setUp = function () {
	// put any code here
	// overrides the Base class
};

/**
 * Call this function to set a constructor for a message type
 * @static
 * @method define
 * @param {String} type The type of the message, e.g. "Streams/chat/message"
 * @param {String|Function} ctor Your message's constructor, or path to a javascript file which will define it
 * @param {Object} methods An optional hash of methods
 */
Streams_Message.define = function (type, ctor, methods) {
	if (typeof type === 'object') {
		for (var t in type) {
			Streams_Message.define(t, type[t]);
		}
		return;
	};
	type = Q.normalize(type);
	if (typeof ctor !== 'function') {
		throw new Q.Error("Q.Streams.Message.define requires ctor to be a function");
	}
	function CustomMessageConstructor() {
		CustomMessageConstructor.constructors.apply(this, arguments);
		ctor.apply(this, arguments);
	}
	Q.mixin(CustomMessageConstructor, Streams_Message);
	Q.extend(CustomMessageConstructor.prototype, methods);	
	return Streams_Message.defined[type] = CustomMessageConstructor;
};

var Mp = Streams_Message.prototype;

/**
 * Get all the instructions from a message.
 * 
 * @method getAll
 */
Mp.getAllInstructions = function _Message_prototype_getAllInstructions () {
	try {
		return JSON.parse(this.fields.instructions);
	} catch (e) {
		return undefined;
	}
};

/**
 * Get the value of an instruction in the message
 * 
 * @method getInstruction
 * @param {String} instructionName
 */
Mp.getInstruction = function _Message_prototype_get (instructionName) {
	var instr = this.getAllInstructions();
	return Q.getObject([instructionName], instr);
};

/**
 * Posts a message to the stream.
 * Currently doesn't perform any access checks, so it is only meant to be called internally.
 * It is not as robust as the PHP version, which is meant for more general use.
 * @method post
 * @static
 * @param {Object} fields
 *  The fields of the message. Requires publisherId, streamName and byUserId
 * @param callback=null {function}
 */
Streams_Message.post = function (fields, callback)
{
	var required = {publisherId: true, streamName: true, byUserId: true};
	for (var k in required) {
		if (!fields[k]) {
			throw new Q.Exception("Streams.Message.post requires " + k);
		}
	}
	var f = Q.extend({
		type: 'text/small',
		content: '',
		instructions: '',
		byClientId: '',
		weight: 1
	}, fields);
	f.sentTime = new Db.Expression("CURRENT_TIMESTAMP");
	var msg = Streams.Message.construct(f);
	
	var query = 
	 " START TRANSACTION;"
	+"		SELECT messageCount"
	+"		  FROM {$prefix}stream"
	+"		  WHERE publisherId = ?"
	+"		  AND name = ?"
	+"		  INTO @Streams_messageCount"
	+"        FOR UPDATE;"
	+"		INSERT INTO {$prefix}message("
	+"			publisherId, streamName, byUserId, byClientId, sentTime, "
	+"			type, content, instructions, weight, ordinal"
	+"		)"
	+"		VALUES("
	+"			?, ?, ?, ?, CURRENT_TIMESTAMP,"
	+"			?, ?, ?, ?, @Streams_messageCount+1"
	+"		);"
	+"		INSERT INTO {$prefix}message_total("
	+"			publisherId, streamName, messageType, messageCount"
	+"		)"
	+"		VALUES("
	+"			?, ?, ?, @Streams_messageCount+1"
	+"		)"
	+"		ON DUPLICATE KEY UPDATE messageCount = messageCount+1;"
	+"		UPDATE {$prefix}stream"
	+"        SET messageCount = @Streams_messageCount+1,"
	+"            updatedTime = CURRENT_TIMESTAMP"
	+"		  WHERE publisherId = ?"
	+"		  AND name = ?;"
	+"		SELECT * FROM {$prefix}stream"
	+"		  WHERE publisherId = ?"
	+"		  AND name = ?;"
	+"		SELECT * FROM {$prefix}message"
	+"		  WHERE publisherId = ?"
	+"		  AND streamName = ?"
	+"		  AND ordinal = @Streams_messageCount+1;"
	+" COMMIT;";
	var values = [
		f.publisherId, f.streamName,
		f.publisherId, f.streamName, f.byUserId, f.byClientId,
		f.type, f.content, f.instructions, f.weight,
		f.publisherId, f.streamName, f.type,
		f.publisherId, f.streamName,
		f.publisherId, f.streamName,
		f.publisherId, f.streamName
	];
	Streams.Stream.db()
	.rawQuery(query, values)
	.execute(function (params) {
		var err = params[""][0];
		if (err) {
			return callback && callback(err);
		}
		var results = params[""][1];
		var stream = Streams.Stream.construct(results[5][0], true);
		var message = Streams.Message.construct(results[6][0]);
		Streams.Stream.emit('post', stream, f.byUserId, message, stream);
		callback && callback.call(stream, null, f.byUserId, message);
	});
};

/**
 * Delivers the message posted to stream according to particular
 * delivery method (see: Streams_SubscriptionRule.deliver). Message template is taken from views/{message.type} folder -
 * 'email.handlebars' or 'mobile.handlebars' or 'device.handlebars' depending on delivery
 * @method deliver
 * @param {Streams.Stream} stream
 * @param {function} toUserId the id of the user to whom to deliver
 * @param {String|Object} deliver can be any key under "Streams"/"rules"/"deliver" config,
 *   such as "default" or "invited". The value in the config should be an array of entries,
 *   each of which can be a string or an array itself.
 *   Entries can contain one or more of "email", "mobile" and "devices".
 *   They are tried, in order, and delivery stops as soon as at least one destination
 *   is found to deliver to.
 *   Or, it can be an object with "to" property
 *   as this key, or "emailAddress", "mobileNumber" or "deviceId" specified directly.
 * @param {Streams.Avatar} avatar the avatar for getting the displayName
 * @param {function} callback
 *	Callback reports errors and response from delivery systems
 */
Streams_Message.prototype.deliver = function(stream, toUserId, deliver, avatar, callback) {
	var instructions = this.getAllInstructions();
	var fields = {
		app: Q.app,
		communityName: Users.communityName(),
		stream: stream,
		message: this,
		instructions: instructions,
		avatar: avatar,
		config: Q.Config.getAll()
	};
	var message = this;
	var messageType = this.fields.type;
	var subject = Streams.Stream.getConfigField(
		stream.fields.type,
		['messages', this.fields.type, 'subject'],
		Q.Config.get(
			['Streams', 'types', '*', 'messages', '*', 'subject'],
			'Please set config "Streams"/"types"/"*"/"messages"/"*"/"subject"'
		)
	);
	if (typeof deliver === 'string') {
		deliver = {to: deliver};
	}

	Users.fetch(toUserId, function (err) {
		var to = Q.Config.get(
			['Streams', 'rules', 'deliver', deliver.to],
			['devices', 'email', 'mobile']
		);
		var uf = this.fields;
		var p1 = new Q.Pipe();
		var streamUrl = stream.url(message.fields.ordinal);
		var o = {
			fields: fields,
			subject: subject,
			deliver: deliver,
			stream: stream,
			streamUrl: streamUrl,
			url: message.getInstruction("url") || streamUrl,
			icon: stream.iconUrl(80),
			user: this,
			avatar: avatar,
			callback: callback
		};
		var result = [];

		/**
		 * @event "Streams/deliver/:messageType"
		 * @param {Object} options for the notification delivery
		 * @param {Function} callback to call when options has been transformed
		 */
		var name = 'Streams/deliver/'+message.fields.type;
		var handler = Q.getObject(name, Q.handlers, '/');
		if (!Q.isEmpty(handler)) {
			Q.handle(handler, message, [o, _afterTransform]);
		} else {
			_afterTransform();
		}

		function _afterTransform() {
			var w1 = [];
			var e, m, d;
			if (e = deliver.emailAddress || deliver.email) {
				_email(e, p1.fill('email'));
				w1.push('email');
			}
			if (m = deliver.mobileNumber || deliver.mobile) {
				_mobile(m, p1.fill('mobile'));
				w1.push('mobile');
			}
			if (d = deliver.deviceId || deliver.device) {
				_device(d, p1.fill('device'));
				w1.push('device');
			}
			p1.add(w1, function () {
				_next(0);
			}).run();
		}
		function _next(i) {
			var destinations = to[i];
			if (!destinations) {
				return callback(new Error("Streams.Message.prototype.deliver: no destinations"));
			}
			if (typeof destinations === 'string') {
				destinations = [destinations];
			}
			var platforms = Q.Config.get('Users', 'apps', 'platforms', []);
			var p2 = new Q.Pipe();
			var waitFor = [];
			Q.each(destinations, function (i, d) {
				var emailAddress = (d.indexOf('email') >= 0 && uf.emailAddress)
					|| (d === 'email+pending' && uf.emailAddressPending);
				var mobileNumber = (d.indexOf('mobile') >= 0 && uf.mobileNumber)
					|| (d === 'mobile+pending' && uf.mobileNumberPending);
				// Give the app an opportunity to modify the fields or anything else
				Streams_Message.emit('deliver/before', o);
				if (emailAddress) {
					_email(emailAddress, p2.fill('email'));
					waitFor.push('email');
				}
				if (mobileNumber) {
					_mobile(mobileNumber, p2.fill('mobile'));
					waitFor.push('mobile');
				}
				if (d === 'devices') {
					_device(null, p2.fill(d));
					waitFor.push(d);
				}
				if (platforms.indexOf(d) >= 0) {
					_platform(p2.fill(d));
					waitFor.push(d);
				}
			});
			p2.add(waitFor, function (params) {
				var success = false;
				for (var k in params) {
					if (params[k][0]) {
						continue;
					}
					if (k === 'email' && params[k][1] === 'log') {
						// email was not sent
						continue;
					}
					if (k === 'mobile' && params[k][1] === 'log') {
						// mobile sms was not sent
						continue;
					}
					if (k === 'devices' && params[k][1].length === 0) {
						// no devices were reached
						continue;
					}
					if (platforms.indexOf(k) >= 0 && !params[k][1]) {
						// no platform apps were reached
						continue;
					}
					success = true;
				}
				if (success) {
					callback(null, result, params);
				} else {
					_next(i+1);
				}
			}).run();
		}
		function _email(emailAddress, callback) {
			o.destination = 'email';
			o.emailAddress = emailAddress;
			Streams_Message.emit('deliver/before', o);  // app may modify some fields
			var viewPath = messageType+'/email.handlebars';
			if (Q.Handlebars.template(viewPath) === null) {
				viewPath = 'Streams/message/email.handlebars';
			}
			Users.Email.sendMessage(
				emailAddress, o.subject, viewPath, o.fields, {html: true, language: uf.preferredLanguage}, callback
			);
			result.push({'email': emailAddress});
		}
		function _mobile(mobileNumber, callback) {
			o.destination = 'mobile';
			o.mobileNumber = mobileNumber;
			Streams_Message.emit('deliver/before', o); // app may modify some fields
			var viewPath = messageType+'/mobile.handlebars';
			if (Q.Handlebars.template(viewPath) === null) {
				viewPath = 'Streams/message/mobile.handlebars';
			}
			Users.Mobile.sendMessage(mobileNumber, viewPath, o.fields, {language: uf.preferredLanguage}, callback);
			result.push({'mobile': mobileNumber});
		}
		function _device(deviceId, callback) {
			o.destination = 'devices';
			o.deviceId = deviceId;
			Streams_Message.emit('deliver/before', o); // app may modify some fields
			var viewPath = messageType+'/device.handlebars';
			if (!Q.Handlebars.template(viewPath)) {
				viewPath = 'Streams/message/device.handlebars';
			}

			Users.pushNotifications(
				toUserId, 
				{
					alert: { title: o.subject },
					payload: message.getAllInstructions(),
					url: o.url,
					icon: o.icon
				},
				callback, 
				{view: viewPath, fields: o.fields, language: uf.preferredLanguage},
				function (device) {
					if (deviceId && device.deviceId !== deviceId) {
					return false;
				}
			});
			result.push({'devices': deviceId || true});
		}
		function _platform(platform, callback) {
			var appId = Users.appInfo(platform).appId;
			Users.AppUser.SELECT('*').WHERE({
				userId: toUserId,
				platform: platform,
				appId: appId
			}).execute(function (err, appusers) {
				if (err) {
					return callback(err);
				}
				var appuser = appusers[0];
				var notification = {
					alert: o.subject,
					href: o.url,
					ref: message.fields.type
				};
				if (appuser) {
					appuser.pushNotification(notification);
				}
				Q.handle(callback, Users, [null, appuser, notification]);
			});
		}
	});
};

module.exports = Streams_Message;