- <?php
-
- /**
- * @module Streams
- */
-
- /**
- * Can be used by $app/admins of a community.
- * Starts a Streams/task to create users from an uploaded csv file, and
- * invite them to the "Streams/experience/main" stream, as well as any
- * other experience streams named in the csv.
- * @class HTTP Streams import
- * @method post
- * @param {array} [$_REQUEST]
- * @param {string} [$_REQUEST.communityId=Users::communityId] If you want to override it
- * @param {string} [$_REQUEST.taskStreamName] Pass the name of a task stream to resume it.
- * In this case, you don't need to pass the file, because it was saved.
- * @param {array} [$_FILES] Array consisting of one or more CSV files.
- * The first line consists of titles or names of streams loaded from
- * JSON files named under Streams/userStreams config.
- * The users are invited to the "Streams/experience/main" experience,
- * and any other experiences listed in the "Experiences" column, if it
- * is present in the CSV file.
- * @return {void}
- */
- function Streams_import_post()
- {
- $app = Q::app();
- $luid = Users::loggedInUser(true)->id;
- $communityId = Q::ifset($_REQUEST, 'communityId', Users::communityId());
-
- $all = Streams::userStreamsTree()->getAll();
- $exceptions = array();
- $users = array();
-
- $task = isset($_REQUEST['taskStreamName'])
- ? Streams::fetchOne($luid, $communityId, $_REQUEST['taskStreamName'], true)
- : Streams::create($luid, $communityId, 'Streams/task', array(
- 'skipAccess' => true,
- 'title' => 'Importing members into ' . Users::communityName()
- ), array(
- 'publisherId' => $app,
- 'streamName' => "Streams/tasks/app",
- 'type' => 'Streams/import'
- ));
- $task->addPreloaded();
- Q_Response::setSlot('taskStreamName', $task->name);
-
- // TODO: make worker php scripts that loop and find task streams
- // which have not been finished yet, start a transaction to
- // mark their state "progress", and start processing the items.
- // Also have way to inquire about whether a task is in progress,
- // and if no response within a certain timeout, mark it as paused,
- // available for any other worker to resume making progress on it.
-
- // store the instructions
- if (!empty($_FILES)) {
- $file = reset($_FILES);
- $tmp = $file['tmp_name'];
- $task->instructions = file_get_contents($tmp);
- $task->save();
- unlink($tmp);
- }
- if (!$task->instructions) {
- return;
- }
- $instructions = $task->instructions;
-
- // Send the response and keep going.
- // WARN: this potentially ties up the PHP thread for a long time
- $timeLimit = Q_Config::get('Streams', 'import', 'timeLimit', 100000);
- ignore_user_abort(true);
- set_time_limit($timeLimit);
- Q_Dispatcher::response(true);
- session_write_close();
-
- // count the number of rows
- $lineCount = substr_count($instructions, PHP_EOL);
- $task->setAttribute('items', $lineCount);
-
- // start parsing the rows
- $j = 0;
- foreach (Q_Utils::csvLines($instructions) as $line) {
- $row = str_getcsv($line, ",");
- if (!$row) {
- continue;
- }
- if (++$j === 1) {
- // get the fields from the first row
- $fields = str_getcsv($instructions, ',');
- if ($fields === false) {
- return;
- }
- $emailAddressKey = Q_Utils::normalize('Email Address');
- $mobileNumberKey = Q_Utils::normalize('Mobile Number');
- $processed = $task->getAttribute('processed', 0);
- continue;
- }
- if ($j <= $processed) {
- continue;
- }
- $notEmpty = false;
- foreach ($row as $v) {
- if ($v) {
- $notEmpty = true;
- break;
- }
- }
- if (!$notEmpty) {
- continue;
- }
-
- // get the data from the row
- $data = array();
- $importUserData = array();
- $streamNames = array();
- foreach ($row as $i => $value) {
- $field = $fields[$i];
- $fn = Q_Utils::normalize($field);
- $data[$fn] = $value;
- if ($fn === 'experiences') {
- $experiences = explode("\n", $value);
- foreach ($experiences as $experience) {
- $streams = Streams::lookup($communityId, 'Streams/experience', $value);
- $stream = reset($streams);
- if (!$stream) {
- $vn = Q_Utils::normalize($value);
- $stream = Streams::fetchOne($luid, $communityId, "Streams/experience/$vn");
- if (!$stream) {
- $stream = Streams::create($luid, $communityId, 'Streams/experience', array(
- 'name' => "Streams/experience/$vn",
- 'title' => $value
- ));
- }
- }
- $streamNames[] = $stream->name;
- }
- continue;
- }
- if ($fn === 'labels') {
- $labelTitles = explode("\n", $value);
- continue;
- }
- foreach ($all as $n => $info) {
- if ($fn === Q_Utils::normalize($n)
- or $fn === Q_Utils::normalize(Q::ifset($info, 'title', ''))) {
- $importUserData[$n] = $value;
- break;
- }
- }
- }
-
- try {
- $streams = Streams::fetch($luid, $communityId, $streamNames);
- foreach ($streams as $stream) {
- if (!$stream->testAdminLevel('manage')) {
- throw new Users_Exception_NotAuthorized();
- }
- }
-
- // prepare the identifier
- Users::$cache['importUserData'] = $importUserData;
- $emailAddress = Q::ifset($data, $emailAddressKey, null);
- $mobileNumber = Q::ifset($data, $mobileNumberKey, null);
- if ($mobileNumber and $emailAddress) {
- if (Users::identify('mobile', $mobileNumber)) {
- $identifier = $mobileNumber;
- $alsoAddEmail = $emailAddress;
- } else if (Users::identify('email', $emailAddress)) {
- $identifier = $emailAddress;
- $alsoAddMobile = $mobileNumber;
- } else {
- $identifier = $mobileNumber;
- $alsoAddEmail = $emailAddress;
- }
- } else if ($mobileNumber) {
- $identifier = $mobileNumber;
- } else if ($emailAddress) {
- $identifier = $emailAddress;
- } else {
- continue; // no one to invite
- }
- $identifier = $mobileNumber ? $mobileNumber : $emailAddress;
-
- // invite the user
- $sn = 'Streams/experience/main';
- $result = Streams::invite($communityId, $sn, compact('identifier'));
- Users::$cache['importUserData'] = null; // already saved this data
- $userId = reset($result['userIds']);
- $users[$userId] = $user = Users::fetch($userId, true);
- if (isset($alsoAddEmail)) {
- $user->addEmail($alsoAddEmail); // sends addEmail message
- }
- if (isset($alsoAddMobile)) {
- $user->addMobile($alsoAddMobile); // sends addMobile message
- }
- $task->setAttribute('processed', $j);
- $task->setAttribute('progress', min($j / $lineCount, 1));
- $task->post($luid, array(
- 'type' => 'Streams/task/progress',
- 'instructions' => compact('mobileNumber', 'emailAddress', 'user', 'processed', 'progress'),
- ), true);
- foreach ($streamNames as $sn) {
- // the following sends an invite message and link by email or mobile
- Streams::invite($communityId, $sn, compact('identifier'));
- }
- if (!empty($labelTitles)) {
- $labels = Users_Label::fetch($luid);
- foreach ($labelTitles as $title) {
- $found = false;
- foreach ($labels as $label) {
- if ($label->title = $title) {
- $found = true;
- break;
- }
- }
- $tn = Q_Utils::normalize($title);
- $label = "Users/$tn";
- if (!$found) {
- Users_Label::addLabel($label, $communityId, $title, $luid);
- }
- Users_Contact::addContact($communityId, $label, $userId, null, $luid);
- }
- }
- } catch (Exception $e) {
- $exceptions[$j] = $e;
- }
- }
- // if we reached here, then the task has completed
- $task->instructions = '';
- $task->post($luid, array(
- 'type' => 'Streams/task/complete'
- ), true);
- }
-