platform/plugins/Streams/handlers/Streams/import/post.php - Q Platform
Show:

File: platform/plugins/Streams/handlers/Streams/import/post.php

  1. <?php
  2.  
  3. /**
  4. * @module Streams
  5. */
  6.  
  7. /**
  8. * Can be used by $app/admins of a community.
  9. * Starts a Streams/task to create users from an uploaded csv file, and
  10. * invite them to the "Streams/experience/main" stream, as well as any
  11. * other experience streams named in the csv.
  12. * @class HTTP Streams import
  13. * @method post
  14. * @param {array} [$_REQUEST]
  15. * @param {string} [$_REQUEST.communityId=Users::communityId] If you want to override it
  16. * @param {string} [$_REQUEST.taskStreamName] Pass the name of a task stream to resume it.
  17. * In this case, you don't need to pass the file, because it was saved.
  18. * @param {array} [$_FILES] Array consisting of one or more CSV files.
  19. * The first line consists of titles or names of streams loaded from
  20. * JSON files named under Streams/userStreams config.
  21. * The users are invited to the "Streams/experience/main" experience,
  22. * and any other experiences listed in the "Experiences" column, if it
  23. * is present in the CSV file.
  24. * @return {void}
  25. */
  26. function Streams_import_post()
  27. {
  28. $app = Q::app();
  29. $luid = Users::loggedInUser(true)->id;
  30. $communityId = Q::ifset($_REQUEST, 'communityId', Users::communityId());
  31.  
  32. $all = Streams::userStreamsTree()->getAll();
  33. $exceptions = array();
  34. $users = array();
  35.  
  36. $task = isset($_REQUEST['taskStreamName'])
  37. ? Streams::fetchOne($luid, $communityId, $_REQUEST['taskStreamName'], true)
  38. : Streams::create($luid, $communityId, 'Streams/task', array(
  39. 'skipAccess' => true,
  40. 'title' => 'Importing members into ' . Users::communityName()
  41. ), array(
  42. 'publisherId' => $app,
  43. 'streamName' => "Streams/tasks/app",
  44. 'type' => 'Streams/import'
  45. ));
  46. $task->addPreloaded();
  47. Q_Response::setSlot('taskStreamName', $task->name);
  48.  
  49. // TODO: make worker php scripts that loop and find task streams
  50. // which have not been finished yet, start a transaction to
  51. // mark their state "progress", and start processing the items.
  52. // Also have way to inquire about whether a task is in progress,
  53. // and if no response within a certain timeout, mark it as paused,
  54. // available for any other worker to resume making progress on it.
  55.  
  56. // store the instructions
  57. if (!empty($_FILES)) {
  58. $file = reset($_FILES);
  59. $tmp = $file['tmp_name'];
  60. $task->instructions = file_get_contents($tmp);
  61. $task->save();
  62. unlink($tmp);
  63. }
  64. if (!$task->instructions) {
  65. return;
  66. }
  67. $instructions = $task->instructions;
  68.  
  69. // Send the response and keep going.
  70. // WARN: this potentially ties up the PHP thread for a long time
  71. $timeLimit = Q_Config::get('Streams', 'import', 'timeLimit', 100000);
  72. ignore_user_abort(true);
  73. set_time_limit($timeLimit);
  74. Q_Dispatcher::response(true);
  75. session_write_close();
  76. // count the number of rows
  77. $lineCount = substr_count($instructions, PHP_EOL);
  78. $task->setAttribute('items', $lineCount);
  79.  
  80. // start parsing the rows
  81. $j = 0;
  82. foreach (Q_Utils::csvLines($instructions) as $line) {
  83. $row = str_getcsv($line, ",");
  84. if (!$row) {
  85. continue;
  86. }
  87. if (++$j === 1) {
  88. // get the fields from the first row
  89. $fields = str_getcsv($instructions, ',');
  90. if ($fields === false) {
  91. return;
  92. }
  93. $emailAddressKey = Q_Utils::normalize('Email Address');
  94. $mobileNumberKey = Q_Utils::normalize('Mobile Number');
  95. $processed = $task->getAttribute('processed', 0);
  96. continue;
  97. }
  98. if ($j <= $processed) {
  99. continue;
  100. }
  101. $notEmpty = false;
  102. foreach ($row as $v) {
  103. if ($v) {
  104. $notEmpty = true;
  105. break;
  106. }
  107. }
  108. if (!$notEmpty) {
  109. continue;
  110. }
  111.  
  112. // get the data from the row
  113. $data = array();
  114. $importUserData = array();
  115. $streamNames = array();
  116. foreach ($row as $i => $value) {
  117. $field = $fields[$i];
  118. $fn = Q_Utils::normalize($field);
  119. $data[$fn] = $value;
  120. if ($fn === 'experiences') {
  121. $experiences = explode("\n", $value);
  122. foreach ($experiences as $experience) {
  123. $streams = Streams::lookup($communityId, 'Streams/experience', $value);
  124. $stream = reset($streams);
  125. if (!$stream) {
  126. $vn = Q_Utils::normalize($value);
  127. $stream = Streams::fetchOne($luid, $communityId, "Streams/experience/$vn");
  128. if (!$stream) {
  129. $stream = Streams::create($luid, $communityId, 'Streams/experience', array(
  130. 'name' => "Streams/experience/$vn",
  131. 'title' => $value
  132. ));
  133. }
  134. }
  135. $streamNames[] = $stream->name;
  136. }
  137. continue;
  138. }
  139. if ($fn === 'labels') {
  140. $labelTitles = explode("\n", $value);
  141. continue;
  142. }
  143. foreach ($all as $n => $info) {
  144. if ($fn === Q_Utils::normalize($n)
  145. or $fn === Q_Utils::normalize(Q::ifset($info, 'title', ''))) {
  146. $importUserData[$n] = $value;
  147. break;
  148. }
  149. }
  150. }
  151.  
  152. try {
  153. $streams = Streams::fetch($luid, $communityId, $streamNames);
  154. foreach ($streams as $stream) {
  155. if (!$stream->testAdminLevel('manage')) {
  156. throw new Users_Exception_NotAuthorized();
  157. }
  158. }
  159.  
  160. // prepare the identifier
  161. Users::$cache['importUserData'] = $importUserData;
  162. $emailAddress = Q::ifset($data, $emailAddressKey, null);
  163. $mobileNumber = Q::ifset($data, $mobileNumberKey, null);
  164. if ($mobileNumber and $emailAddress) {
  165. if (Users::identify('mobile', $mobileNumber)) {
  166. $identifier = $mobileNumber;
  167. $alsoAddEmail = $emailAddress;
  168. } else if (Users::identify('email', $emailAddress)) {
  169. $identifier = $emailAddress;
  170. $alsoAddMobile = $mobileNumber;
  171. } else {
  172. $identifier = $mobileNumber;
  173. $alsoAddEmail = $emailAddress;
  174. }
  175. } else if ($mobileNumber) {
  176. $identifier = $mobileNumber;
  177. } else if ($emailAddress) {
  178. $identifier = $emailAddress;
  179. } else {
  180. continue; // no one to invite
  181. }
  182. $identifier = $mobileNumber ? $mobileNumber : $emailAddress;
  183.  
  184. // invite the user
  185. $sn = 'Streams/experience/main';
  186. $result = Streams::invite($communityId, $sn, compact('identifier'));
  187. Users::$cache['importUserData'] = null; // already saved this data
  188. $userId = reset($result['userIds']);
  189. $users[$userId] = $user = Users::fetch($userId, true);
  190. if (isset($alsoAddEmail)) {
  191. $user->addEmail($alsoAddEmail); // sends addEmail message
  192. }
  193. if (isset($alsoAddMobile)) {
  194. $user->addMobile($alsoAddMobile); // sends addMobile message
  195. }
  196. $task->setAttribute('processed', $j);
  197. $task->setAttribute('progress', min($j / $lineCount, 1));
  198. $task->post($luid, array(
  199. 'type' => 'Streams/task/progress',
  200. 'instructions' => compact('mobileNumber', 'emailAddress', 'user', 'processed', 'progress'),
  201. ), true);
  202. foreach ($streamNames as $sn) {
  203. // the following sends an invite message and link by email or mobile
  204. Streams::invite($communityId, $sn, compact('identifier'));
  205. }
  206. if (!empty($labelTitles)) {
  207. $labels = Users_Label::fetch($luid);
  208. foreach ($labelTitles as $title) {
  209. $found = false;
  210. foreach ($labels as $label) {
  211. if ($label->title = $title) {
  212. $found = true;
  213. break;
  214. }
  215. }
  216. $tn = Q_Utils::normalize($title);
  217. $label = "Users/$tn";
  218. if (!$found) {
  219. Users_Label::addLabel($label, $communityId, $title, $luid);
  220. }
  221. Users_Contact::addContact($communityId, $label, $userId, null, $luid);
  222. }
  223. }
  224. } catch (Exception $e) {
  225. $exceptions[$j] = $e;
  226. }
  227. }
  228. // if we reached here, then the task has completed
  229. $task->instructions = '';
  230. $task->post($luid, array(
  231. 'type' => 'Streams/task/complete'
  232. ), true);
  233. }