<?php
/**
* @module Streams
*/
/**
* Can be used by $app/admins of a community.
* Starts a Streams/task to create users from an uploaded csv file, and
* invite them to the "Streams/experience/main" stream, as well as any
* other experience streams named in the csv.
* @class HTTP Streams import
* @method post
* @param {array} [$_REQUEST]
* @param {string} [$_REQUEST.communityId=Users::communityId] If you want to override it
* @param {string} [$_REQUEST.taskStreamName] Pass the name of a task stream to resume it.
* In this case, you don't need to pass the file, because it was saved.
* @param {array} [$_FILES] Array consisting of one or more CSV files.
* The first line consists of titles or names of streams loaded from
* JSON files named under Streams/userStreams config.
* The users are invited to the "Streams/experience/main" experience,
* and any other experiences listed in the "Experiences" column, if it
* is present in the CSV file.
* @return {void}
*/
function Streams_import_post()
{
$app = Q::app();
$luid = Users::loggedInUser(true)->id;
$communityId = Q::ifset($_REQUEST, 'communityId', Users::communityId());
$all = Streams::userStreamsTree()->getAll();
$exceptions = array();
$users = array();
$task = isset($_REQUEST['taskStreamName'])
? Streams::fetchOne($luid, $communityId, $_REQUEST['taskStreamName'], true)
: Streams::create($luid, $communityId, 'Streams/task', array(
'skipAccess' => true,
'title' => 'Importing members into ' . Users::communityName()
), array(
'publisherId' => $app,
'streamName' => "Streams/tasks/app",
'type' => 'Streams/import'
));
$task->addPreloaded();
Q_Response::setSlot('taskStreamName', $task->name);
// TODO: make worker php scripts that loop and find task streams
// which have not been finished yet, start a transaction to
// mark their state "progress", and start processing the items.
// Also have way to inquire about whether a task is in progress,
// and if no response within a certain timeout, mark it as paused,
// available for any other worker to resume making progress on it.
// store the instructions
if (!empty($_FILES)) {
$file = reset($_FILES);
$tmp = $file['tmp_name'];
$task->instructions = file_get_contents($tmp);
$task->save();
unlink($tmp);
}
if (!$task->instructions) {
return;
}
$instructions = $task->instructions;
// Send the response and keep going.
// WARN: this potentially ties up the PHP thread for a long time
$timeLimit = Q_Config::get('Streams', 'import', 'timeLimit', 100000);
ignore_user_abort(true);
set_time_limit($timeLimit);
Q_Dispatcher::response(true);
session_write_close();
// count the number of rows
$lineCount = substr_count($instructions, PHP_EOL);
$task->setAttribute('items', $lineCount);
// start parsing the rows
$j = 0;
foreach (Q_Utils::csvLines($instructions) as $line) {
$row = str_getcsv($line, ",");
if (!$row) {
continue;
}
if (++$j === 1) {
// get the fields from the first row
$fields = str_getcsv($instructions, ',');
if ($fields === false) {
return;
}
$emailAddressKey = Q_Utils::normalize('Email Address');
$mobileNumberKey = Q_Utils::normalize('Mobile Number');
$processed = $task->getAttribute('processed', 0);
continue;
}
if ($j <= $processed) {
continue;
}
$notEmpty = false;
foreach ($row as $v) {
if ($v) {
$notEmpty = true;
break;
}
}
if (!$notEmpty) {
continue;
}
// get the data from the row
$data = array();
$importUserData = array();
$streamNames = array();
foreach ($row as $i => $value) {
$field = $fields[$i];
$fn = Q_Utils::normalize($field);
$data[$fn] = $value;
if ($fn === 'experiences') {
$experiences = explode("\n", $value);
foreach ($experiences as $experience) {
$streams = Streams::lookup($communityId, 'Streams/experience', $value);
$stream = reset($streams);
if (!$stream) {
$vn = Q_Utils::normalize($value);
$stream = Streams::fetchOne($luid, $communityId, "Streams/experience/$vn");
if (!$stream) {
$stream = Streams::create($luid, $communityId, 'Streams/experience', array(
'name' => "Streams/experience/$vn",
'title' => $value
));
}
}
$streamNames[] = $stream->name;
}
continue;
}
if ($fn === 'labels') {
$labelTitles = explode("\n", $value);
continue;
}
foreach ($all as $n => $info) {
if ($fn === Q_Utils::normalize($n)
or $fn === Q_Utils::normalize(Q::ifset($info, 'title', ''))) {
$importUserData[$n] = $value;
break;
}
}
}
try {
$streams = Streams::fetch($luid, $communityId, $streamNames);
foreach ($streams as $stream) {
if (!$stream->testAdminLevel('manage')) {
throw new Users_Exception_NotAuthorized();
}
}
// prepare the identifier
Users::$cache['importUserData'] = $importUserData;
$emailAddress = Q::ifset($data, $emailAddressKey, null);
$mobileNumber = Q::ifset($data, $mobileNumberKey, null);
if ($mobileNumber and $emailAddress) {
if (Users::identify('mobile', $mobileNumber)) {
$identifier = $mobileNumber;
$alsoAddEmail = $emailAddress;
} else if (Users::identify('email', $emailAddress)) {
$identifier = $emailAddress;
$alsoAddMobile = $mobileNumber;
} else {
$identifier = $mobileNumber;
$alsoAddEmail = $emailAddress;
}
} else if ($mobileNumber) {
$identifier = $mobileNumber;
} else if ($emailAddress) {
$identifier = $emailAddress;
} else {
continue; // no one to invite
}
$identifier = $mobileNumber ? $mobileNumber : $emailAddress;
// invite the user
$sn = 'Streams/experience/main';
$result = Streams::invite($communityId, $sn, compact('identifier'));
Users::$cache['importUserData'] = null; // already saved this data
$userId = reset($result['userIds']);
$users[$userId] = $user = Users::fetch($userId, true);
if (isset($alsoAddEmail)) {
$user->addEmail($alsoAddEmail); // sends addEmail message
}
if (isset($alsoAddMobile)) {
$user->addMobile($alsoAddMobile); // sends addMobile message
}
$task->setAttribute('processed', $j);
$task->setAttribute('progress', min($j / $lineCount, 1));
$task->post($luid, array(
'type' => 'Streams/task/progress',
'instructions' => compact('mobileNumber', 'emailAddress', 'user', 'processed', 'progress'),
), true);
foreach ($streamNames as $sn) {
// the following sends an invite message and link by email or mobile
Streams::invite($communityId, $sn, compact('identifier'));
}
if (!empty($labelTitles)) {
$labels = Users_Label::fetch($luid);
foreach ($labelTitles as $title) {
$found = false;
foreach ($labels as $label) {
if ($label->title = $title) {
$found = true;
break;
}
}
$tn = Q_Utils::normalize($title);
$label = "Users/$tn";
if (!$found) {
Users_Label::addLabel($label, $communityId, $title, $luid);
}
Users_Contact::addContact($communityId, $label, $userId, null, $luid);
}
}
} catch (Exception $e) {
$exceptions[$j] = $e;
}
}
// if we reached here, then the task has completed
$task->instructions = '';
$task->post($luid, array(
'type' => 'Streams/task/complete'
), true);
}