1<?php 2 3//# temporarily remove this check, to make sure processing the queue with a remote call continues to work 4//# https://mantis.phplist.com/view.php?id=17316 5//verifyCsrfGetToken(); 6 7if (isset($_GET['login']) || isset($_GET['password'])) { 8 echo Error(s('Remote processing of the queue is now handled with a processing secret')); 9 10 return; 11} 12 13if ($inRemoteCall) { 14 // check that we actually still want remote queue processing 15 $pqChoice = getConfig('pqchoice'); 16 if (SHOW_PQCHOICE && $pqChoice != 'phplistdotcom') { 17 $counters['campaigns'] = 0; 18 echo outputCounters(); 19 exit; 20 } 21} else { 22 // we're in a normal session, so the csrf token should work 23 verifyCsrfGetToken(); 24} 25 26require_once dirname(__FILE__).'/../accesscheck.php'; 27require_once dirname(__FILE__).'/../sendemaillib.php'; 28 29$status = 'OK'; 30$processqueue_timer = new timer(); 31$domainthrottle = array(); 32// check for other processes running 33 34if ((!empty($GLOBALS['commandline']) && isset($cline['f'])) || $inRemoteCall) { 35 // force set, so kill other processes 36 cl_output('Force set, killing other send processes'); 37 $send_process_id = getPageLock(1); 38} else { 39 $send_process_id = getPageLock(); 40} 41if (empty($send_process_id)) { 42 processQueueOutput(s('Unable get lock for processing')); 43 $status = s('Error processing'); 44 45 return; 46} 47 48$mm = inMaintenanceMode(); 49if (!empty($mm)) { 50 processQueueOutput(s('The system is in maintenance mode, stopping. Try again later.')); 51 $status = s('In maintenance mode, try again later.'); 52 releaseLock($send_process_id); 53 return; 54} 55 56//cl_output('page locked on '.$send_process_id); 57 58if (empty($GLOBALS['commandline']) && isset($_GET['reload'])) { 59 $reload = sprintf('%d', $_GET['reload']); 60} else { 61 $reload = 0; 62} 63 64//# this one sends a notification to plugins that processing has started 65foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 66 $plugin->processQueueStart(); 67} 68 69//# let's make sure all subscribers have a uniqid 70//# only when on CL 71if ($GLOBALS['commandline']) { 72 $req = Sql_Query(sprintf('select id from %s where uniqid is NULL or uniqid = ""', $GLOBALS['tables']['user'])); 73 $num = Sql_Affected_Rows(); 74 if ($num) { 75 cl_output(s('Giving a Unique ID to %d subscribers, this may take a while', $num)); 76 while ($row = Sql_Fetch_Row($req)) { 77 Sql_query(sprintf('update %s set uniqid = "%s" where id = %d', $GLOBALS['tables']['user'], getUniqID(), $row[0])); 78 } 79 } 80} 81// make sure subscribers have a UUID. They may not when created via eg the API 82$req = Sql_Query(sprintf('select id from %s where uuid is NULL or uuid = ""', $GLOBALS['tables']['user'])); 83$num = Sql_Affected_Rows(); 84if ($num) { 85 cl_output(s('Giving a UUID to %d subscribers, this may take a while', $num)); 86 processQueueOutput(s('Giving a UUID to %d subscribers, this may take a while', $num)); 87 while ($row = Sql_Fetch_Row($req)) { 88 Sql_query(sprintf('update %s set uuid = "%s" where id = %d', $GLOBALS['tables']['user'], (string) uuid::generate(4), $row[0])); 89 } 90} 91// make sure campaigns have a UUID. They may not when created via eg the API 92$req = Sql_Query(sprintf('select id from %s where uuid is NULL or uuid = ""', $GLOBALS['tables']['message'])); 93$num = Sql_Affected_Rows(); 94if ($num) { 95 cl_output(s('Giving a UUID to %d campaigns', $num)); 96 while ($row = Sql_Fetch_Row($req)) { 97 Sql_query(sprintf('update %s set uuid = "%s" where id = %d', $GLOBALS['tables']['message'], (string) uuid::generate(4), $row[0])); 98 } 99} 100 101// keep for now, just in case 102if (!Sql_Table_exists($GLOBALS['tables']['user_message_view'])) { 103 cl_output(s('Creating new table "user_message_view"')); 104 createTable('user_message_view'); 105} 106 107$counters['num_per_batch'] = 0; 108$batch_period = 0; 109$script_stage = 0; // start 110$someusers = $skipped = 0; 111 112$maxbatch = -1; 113$minbatchperiod = -1; 114// check for batch limits 115$ISPrestrictions = ''; 116$ISPlockfile = ''; 117//$rssitems = array(); //Obsolete by rssmanager plugin 118$user_attribute_query = ''; 119$lastsent = !empty($_GET['lastsent']) ? sprintf('%d', $_GET['lastsent']) : 0; 120$lastskipped = !empty($_GET['lastskipped']) ? sprintf('%d', $_GET['lastskipped']) : 0; 121 122if ($fp = @fopen('/etc/phplist.conf', 'r')) { 123 $contents = fread($fp, filesize('/etc/phplist.conf')); 124 fclose($fp); 125 $lines = explode("\n", $contents); 126 $ISPrestrictions = $GLOBALS['I18N']->get('The following restrictions have been set by your ISP:')."\n"; 127 foreach ($lines as $line) { 128 list($key, $val) = explode('=', $line); 129 130 switch ($key) { 131 case 'maxbatch': 132 $maxbatch = sprintf('%d', $val); 133 $ISPrestrictions .= "$key = $val\n"; 134 break; 135 case 'minbatchperiod': 136 $minbatchperiod = sprintf('%d', $val); 137 $ISPrestrictions .= "$key = $val\n"; 138 break; 139 case 'lockfile': 140 $ISPlockfile = $val; 141 } 142 } 143} 144if (MAILQUEUE_BATCH_SIZE) { 145 if ($maxbatch > 0) { 146 $counters['num_per_batch'] = min(MAILQUEUE_BATCH_SIZE, $maxbatch); 147 } else { 148 $counters['num_per_batch'] = sprintf('%d', MAILQUEUE_BATCH_SIZE); 149 } 150 if (MAILQUEUE_BATCH_PERIOD) { 151 if ($minbatchperiod > 0) { 152 $batch_period = max(MAILQUEUE_BATCH_PERIOD, $minbatchperiod); 153 } else { 154 $batch_period = MAILQUEUE_BATCH_PERIOD; 155 } 156 } 157} else { 158 if ($maxbatch > 0) { 159 $counters['num_per_batch'] = $maxbatch; 160 } 161} 162 163//# force batch processing in small batches when called from the web interface 164/* 165 * bad idea, we shouldn't touch the batch settings, in case they are very specific for 166 * ISP restrictions, instead limit webpage processing by time (below) 167 * 168if (empty($GLOBALS['commandline'])) { 169 $counters['num_per_batch'] = min($counters['num_per_batch'],100); 170 $batch_period = max($batch_period,1); 171} elseif (isset($cline['m'])) { 172 $cl_num_per_batch = sprintf('%d',$cline['m']); 173 ## don't block when the param is not a number 174 if (!empty($cl_num_per_batch)) { 175 $counters['num_per_batch'] = $cl_num_per_batch; 176 } 177 cl_output("Batch set with commandline to ".$counters['num_per_batch']); 178} 179*/ 180$maxProcessQueueTime = 0; 181if (defined('MAX_PROCESSQUEUE_TIME') && MAX_PROCESSQUEUE_TIME > 0) { 182 $maxProcessQueueTime = (int) MAX_PROCESSQUEUE_TIME; 183} 184// in-page processing force to a minute max, and make sure there's a batch size 185if (empty($GLOBALS['commandline'])) { 186 $maxProcessQueueTime = min($maxProcessQueueTime, 60); 187 if ($counters['num_per_batch'] <= 0) { 188 $counters['num_per_batch'] = 10000; 189 } 190} 191 192if (VERBOSE && $maxProcessQueueTime) { 193 processQueueOutput(s('Maximum time for queue processing').': '.$maxProcessQueueTime, 'progress'); 194} 195 196if (isset($cline['m'])) { 197 cl_output('Max to send is '.$cline['m'].' num per batch is '.$counters['num_per_batch']); 198 $clinemax = (int) $cline['m']; 199 //# slow down just before max 200 if ($clinemax < 20) { 201 $counters['num_per_batch'] = min(2, $clinemax, $counters['num_per_batch']); 202 } elseif ($clinemax < 200) { 203 $counters['num_per_batch'] = min(20, $clinemax, $counters['num_per_batch']); 204 } else { 205 $counters['num_per_batch'] = min($clinemax, $counters['num_per_batch']); 206 } 207 cl_output('Max to send is '.$cline['m'].' setting num per batch to '.$counters['num_per_batch']); 208} 209 210$original_num_per_batch = $counters['num_per_batch']; 211if ($counters['num_per_batch'] && $batch_period) { 212 // check how many were sent in the last batch period and take off that 213 // amount from this batch 214 /* 215 processQueueOutput(sprintf('select count(*) from %s where entered > date_sub(now(),interval %d second) and status = "sent"', 216 $tables["usermessage"],$batch_period)); 217 */ 218 $recently_sent = Sql_Fetch_Row_Query(sprintf('select count(*) from %s where entered > date_sub(now(),interval %d second) and status = "sent"', 219 $tables['usermessage'], $batch_period)); 220 cl_output('Recently sent : '.$recently_sent[0]); 221 $counters['num_per_batch'] -= $recently_sent[0]; 222 223 // if this ends up being 0 or less, don't send anything at all 224 if ($counters['num_per_batch'] == 0) { 225 $counters['num_per_batch'] = -1; 226 } 227} 228// output some stuff to make sure it's not buffered in the browser 229for ($i = 0; $i < 10000; ++$i) { 230 echo ' '; 231 if ($i % 100 == 0) { 232 echo "\n"; 233 } 234} 235echo '<style type="text/css" src="css/app.css"></style>'; 236echo '<style type="text/css" src="ui/'.$GLOBALS['ui'].'/css/style.css"></style>'; 237echo '<script type="text/javascript" src="js/'.$GLOBALS['jQuery'].'"></script>'; 238//# not sure this works, but would be nice 239echo '<script type="text/javascript">$("#favicon").attr("href","images/busy.gif");</script>'; 240 241flush(); 242// report keeps track of what is going on 243$report = ''; 244$nothingtodo = 0; 245$cached = array(); // cache the message from the database to avoid reloading it every time 246 247function my_shutdown() 248{ 249 global $script_stage, $reload; 250// processQueueOutput( "Script status: ".connection_status(),0); # with PHP 4.2.1 buggy. http://bugs.php.net/bug.php?id=17774 251 processQueueOutput(s('Script stage').': '.$script_stage, 0, 'progress'); 252 global $counters, $report, $send_process_id, $tables, $nothingtodo, $processed, $notsent, $unconfirmed, $batch_period; 253 $some = $processed; 254 $delaytime = 0; 255 if (!$some) { 256 processQueueOutput($GLOBALS['I18N']->get('Finished, Nothing to do'), 0, 'progress'); 257 $nothingtodo = 1; 258 } 259 260 $totaltime = $GLOBALS['processqueue_timer']->elapsed(1); 261 if ($totaltime > 0) { 262 $msgperhour = (3600 / $totaltime) * $counters['sent']; 263 } else { 264 $msgperhour = s('Calculating'); 265 } 266 if ($counters['sent']) { 267 processQueueOutput(sprintf('%d %s %01.2f %s (%d %s)', $counters['sent'], 268 $GLOBALS['I18N']->get('messages sent in'), 269 $totaltime, $GLOBALS['I18N']->get('seconds'), $msgperhour, $GLOBALS['I18N']->get('msgs/hr')), 270 $counters['sent'], 'progress'); 271 } 272 if ($counters['invalid']) { 273 processQueueOutput(s('%d invalid email addresses', $counters['invalid']), 1, 'progress'); 274 } 275 if ($counters['failed_sent']) { 276 processQueueOutput(s('%d failed (will retry later)', $counters['failed_sent']), 1, 'progress'); 277 foreach ($counters as $label => $value) { 278 // processQueueOutput(sprintf('%d %s',$value,$GLOBALS['I18N']->get($label)),1,'progress'); 279 cl_output(sprintf('%d %s', $value, $GLOBALS['I18N']->get($label))); 280 } 281 } 282 if ($unconfirmed) { 283 processQueueOutput(sprintf($GLOBALS['I18N']->get('%d emails unconfirmed (not sent)'), $unconfirmed), 1, 284 'progress'); 285 } 286 287 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 288 $plugin->processSendStats($counters['sent'], $counters['invalid'], $counters['failed_sent'], $unconfirmed, 289 $counters); 290 } 291 292 flushClickTrackCache(); 293 releaseLock($send_process_id); 294 295 finish('info', $report, $script_stage); 296 if ($script_stage < 5 && !$nothingtodo) { 297 processQueueOutput($GLOBALS['I18N']->get('Warning: script never reached stage 5')."\n".$GLOBALS['I18N']->get('This may be caused by a too slow or too busy server')." \n"); 298 } elseif ($script_stage == 5 && (!$nothingtodo || isset($GLOBALS['wait']))) { 299 // if the script timed out in stage 5, reload the page to continue with the rest 300 ++$reload; 301 if (!$GLOBALS['commandline'] && $counters['num_per_batch'] && $batch_period) { 302 if ($counters['sent'] + 10 < $GLOBALS['original_num_per_batch']) { 303 processQueueOutput($GLOBALS['I18N']->get('Less than batch size were sent, so reloading imminently'), 1, 304 'progress'); 305 $counters['delaysend'] = 10; 306 } else { 307 $counters['delaysend'] = (int) ($batch_period - $totaltime); 308 $delaytime = 30; //# actually with the iframe we can reload fairly quickly 309 processQueueOutput(s('Waiting for %d seconds before reloading', $delaytime), 1, 'progress'); 310 } 311 } 312 $counters['delaysend'] = (int) ($batch_period - $totaltime); 313 if (empty($GLOBALS['inRemoteCall']) && empty($GLOBALS['commandline'])) { 314 if (defined('JSLEEPMETHOD')) { 315 printf('<script type="text/javascript"> 316 setTimeout(function() { 317 document.location = "./?page=pageaction&action=processqueue&ajaxed=true&reload=%d&lastsent=%d&lastskipped=%d%s"; 318 }, %d); 319 </script></body></html>', $reload, $counters['sent'], $notsent, addCsrfGetToken(), $delaytime*1000); 320 } else { 321 sleep($delaytime); 322 printf('<script type="text/javascript"> 323 document.location = "./?page=pageaction&action=processqueue&ajaxed=true&reload=%d&lastsent=%d&lastskipped=%d%s"; 324 </script></body></html>', $reload, $counters['sent'], $notsent, addCsrfGetToken()); 325 } 326 } 327 } elseif ($script_stage == 6 || $nothingtodo) { 328 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 329 $plugin->messageQueueFinished(); 330 } 331 processQueueOutput($GLOBALS['I18N']->get('Finished, All done'), 0); 332 echo '<script type="text/javascript"> 333 var parentJQuery = window.parent.jQuery; 334 window.parent.allDone("' .s('All done').'"); 335 </script>'; 336 } else { 337 processQueueOutput(s('Script finished, but not all messages have been sent yet.')); 338 } 339 if (!empty($GLOBALS['inRemoteCall'])) { 340 ob_end_clean(); 341 echo outputCounters(); 342 @ob_start(); 343 } 344 345 if (empty($GLOBALS['inRemoteCall']) && empty($GLOBALS['commandline']) && empty($_GET['ajaxed'])) { 346 return; 347 } elseif (!empty($GLOBALS['inRemoteCall']) || !empty($GLOBALS['commandline'])) { 348 @ob_end_clean(); 349 } 350 exit; 351} 352 353register_shutdown_function('my_shutdown'); 354 355//# some general functions 356function finish($flag, $message, $script_stage) 357{ 358 global $nothingtodo, $counters, $messageid; 359 if ($flag == 'error') { 360 $subject = s('Message queue processing errors'); 361 } elseif ($flag == 'info') { 362 $subject = s('Message queue processing report'); 363 } 364 if (!$nothingtodo && !$GLOBALS['inRemoteCall']) { 365 processQueueOutput(s('Finished this run'), 1, 'progress'); 366 echo '<script type="text/javascript"> 367 var parentJQuery = window.parent.jQuery; 368 parentJQuery("#progressmeter").updateSendProgress("' .$counters['sent'].','.$counters['total_users_for_message '.$messageid].'"); 369 </script>'; 370 } 371 if (!$GLOBALS['inRemoteCall'] && !TEST && !$nothingtodo && SEND_QUEUE_PROCESSING_REPORT) { 372 $reportSent = false; 373 374 // Execute plugin hooks for sending report 375 // @@TODO work out a way to deal with the order of processing the plugins 376 // as that can make a difference here. 377 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 378 if (!$reportSent) { 379 $reportSent = $plugin->sendReport($subject, $message); 380 } 381 } 382 383 // If plugins have not sent the report, send it the default way 384 if (!$reportSent) { 385 $messageWithIntro = s('The following events occured while processing the message queue:')."\n".$message; 386 $messageWithIntroAndFooter = $messageWithIntro."\n\n".s('To stop receiving these reports read:').' https://resources.phplist.com/system/config/send_queue_processing_report'."\n\n"; 387 sendReport($subject, $messageWithIntroAndFooter); 388 } 389 } 390} 391 392function ProcessError($message) 393{ 394 global $report; 395 $report .= $message; 396 processQueueOutput("Error: $message"); 397 exit; 398} 399 400function processQueueOutput($message, $logit = 1, $target = 'summary') 401{ 402 global $report, $shadecount, $counters, $messageid; 403 if (isset($counters['total_users_for_message '.$messageid])) { 404 $total = $counters['total_users_for_message '.$messageid]; 405 } else { 406 $total = 0; 407 } 408 if (!isset($shadecount)) { 409 $shadecount = 0; 410 } 411 if (is_array($message)) { 412 $tmp = ''; 413 foreach ($message as $key => $val) { 414 $tmp .= $key.'='.$val.'; '; 415 } 416 $message = $tmp; 417 } 418 if (!empty($GLOBALS['commandline'])) { 419 cl_output(strip_tags($message).' ['.$GLOBALS['processqueue_timer']->interval(1).'] ('.$GLOBALS['pagestats']['number_of_queries'].')'); 420 $infostring = '['.date('D j M Y H:i', time()).'] [CL]'; 421 } elseif ($GLOBALS['inRemoteCall']) { 422 //# with a remote call we suppress output 423 @ob_end_clean(); 424 $infostring = ''; 425 $message = ''; 426 @ob_start(); 427 428 return; 429 } else { 430 $infostring = '['.date('D j M Y H:i', time()).'] ['.getClientIP().']'; 431 //print "$infostring $message<br/>\n"; 432 $lines = explode("\n", $message); 433 foreach ($lines as $line) { 434 $line = preg_replace('/"/', '\"', $line); 435 436 //# contribution in forums, http://forums.phplist.com/viewtopic.php?p=14648 437 //Replace the "’" which is not replaced by html_decode 438 $line = preg_replace('/’/', "'", $line); 439 //Decode HTML chars 440 $line = html_entity_decode($line, ENT_QUOTES, 'UTF-8'); 441 442 echo "\n".'<div class="output shade'.$shadecount.'">'.$line.'</div>'; 443 $line = str_replace("'", "\'", $line); // #16880 - avoid JS error 444 echo '<script type="text/javascript"> 445 var parentJQuery = window.parent.jQuery; 446 parentJQuery("#processqueue' .$target.'").append(\'<div class="output shade'.$shadecount.'">'.$line.'</div>\'); 447 parentJQuery("#processqueue' .$target.'").animate({scrollTop:100000}, "slow"); 448 </script>'; 449 $shadecount = !$shadecount; 450 for ($i = 0; $i < 10000; ++$i) { 451 echo ' '; 452 if ($i % 100 == 0) { 453 echo "\n"; 454 } 455 } 456 } 457 @ob_flush(); 458 flush(); 459 } 460 461 $report .= "\n$infostring $message"; 462 if ($logit) { 463 logEvent($message); 464 } 465 flush(); 466} 467 468function outputCounters() 469{ 470 global $counters; 471 $result = ''; 472 if (function_exists('json_encode')) { // only PHP5.2.0 and up 473 return json_encode($counters); 474 } else { 475 //# keep track of which php versions we need to continue to support 476 $counters['PHPVERSION'] = phpversion(); 477 foreach ($counters as $key => $val) { 478 $result .= $key.'='.$val.';'; 479 } 480 481 return $result; 482 } 483} 484 485function sendEmailTest($messageid, $email) 486{ 487 global $report; 488 if (VERBOSE) { 489 processQueueOutput($GLOBALS['I18N']->get('(test)').' '.$GLOBALS['I18N']->get('Would have sent').' '.$messageid.$GLOBALS['I18N']->get('to').' '.$email); 490 } else { 491 $report .= "\n".$GLOBALS['I18N']->get('(test)').' '.$GLOBALS['I18N']->get('Would have sent').' '.$messageid.$GLOBALS['I18N']->get('to').' '.$email; 492 } 493 // fake a bit of a delay, 494 usleep(0.75 * 1000000); 495 // and say it was fine. 496 return true; 497} 498 499// we don not want to timeout or abort 500$abort = ignore_user_abort(1); 501set_time_limit(600); 502flush(); 503 504if (empty($reload)) { //# only show on first load 505 processQueueOutput($GLOBALS['I18N']->get('Started'), 0); 506 if (defined('SYSTEM_TIMEZONE')) { 507 processQueueOutput($GLOBALS['I18N']->get('Time now ').date('Y-m-d H:i')); 508 } 509} 510//processQueueOutput('Will process for a maximum of '.$maxProcessQueueTime.' seconds '.MAX_PROCESSQUEUE_TIME); 511 512//# ask plugins if processing is allowed at all 513foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 514 // cl_output('Asking '.$pluginname); 515 if (!$plugin->allowProcessQueue()) { 516 processQueueOutput(s('Processing blocked by plugin %s', $pluginname)); 517 finish('info', s('Processing blocked by plugin %s', $pluginname)); 518 exit; 519 } 520} 521 522if (empty($reload)) { //# only show on first load 523 if (!empty($ISPrestrictions)) { 524 processQueueOutput($ISPrestrictions); 525 } 526 if (is_file($ISPlockfile)) { 527 ProcessError(s('Processing has been suspended by your ISP, please try again later'), 1); 528 } 529} 530 531if ($counters['num_per_batch'] > 0) { 532 if ($original_num_per_batch != $counters['num_per_batch']) { 533 if (empty($reload)) { 534 processQueueOutput(s('Sending in batches of %s messages', number_format($original_num_per_batch)), 0); 535 } 536 $diff = $original_num_per_batch - $counters['num_per_batch']; 537 if ($diff < 0) { 538 $diff = 0; 539 } 540 processQueueOutput(s('This batch will be %s emails, because in the last %s seconds %s emails were sent', 541 number_format($counters['num_per_batch']), number_format($batch_period), number_format($diff)), 0, 'progress'); 542 } else { 543 544 processQueueOutput(s('Sending in batches of %s emails', number_format($counters['num_per_batch'])), 0, 'progress'); 545 } 546} elseif ($counters['num_per_batch'] < 0) { 547 processQueueOutput(s('In the last %s seconds more emails were sent (%s) than is currently allowed per batch (%s)', 548 number_format($batch_period), number_format($recently_sent[0]), number_format($original_num_per_batch)), 0, 'progress'); 549 $processed = 0; 550 $script_stage = 5; 551 $GLOBALS['wait'] = $batch_period; 552 553 return; 554} 555$counters['batch_total'] = $counters['num_per_batch']; 556$counters['failed_sent'] = 0; 557$counters['invalid'] = 0; 558$counters['sent'] = 0; 559 560if (0 && $reload) { 561 processQueueOutput(s('Sent in last run').": $lastsent", 0, 'progress'); 562 processQueueOutput(s('Skipped in last run').": $lastskipped", 0, 'progress'); 563} 564 565$script_stage = 1; // we are active 566$notsent = $unconfirmed = $cannotsend = 0; 567 568//# check for messages that need requeuing 569 570$req = Sql_Query(sprintf('select id from %s where requeueinterval > 0 and requeueuntil > now() and status = "sent"', 571 $tables['message'])); 572 573while ($msg = Sql_Fetch_Assoc($req)) { 574 Sql_query(sprintf( 575 'UPDATE %s 576 SET status = "submitted", 577 sendstart = null, 578 embargo = embargo + 579 INTERVAL (FLOOR(TIMESTAMPDIFF(MINUTE, embargo, GREATEST(embargo, NOW())) / requeueinterval) + 1) * requeueinterval MINUTE 580 WHERE id = %d', 581 $GLOBALS['tables']['message'], 582 $msg['id'] 583 )); 584 585 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 586 $plugin->messageReQueued($msg['id']); 587 } 588 //# @@@ need to update message data as well 589} 590 591$messagelimit = ''; 592//# limit the number of campaigns to work on 593if (defined('MAX_PROCESS_MESSAGE')) { 594 $messagelimit = sprintf(' limit %d ', MAX_PROCESS_MESSAGE); 595} 596 597$query = ' select id from '.$tables['message'].' where status not in ("draft", "sent", "prepared", "suspended") and embargo <= now() order by entered '.$messagelimit; 598if (VERBOSE) { 599 processQueueOutput($query); 600} 601$messages = Sql_query($query); 602$num_messages = Sql_Num_Rows($messages); 603if (Sql_Has_Error($database_connection)) { 604 ProcessError(Sql_Error($database_connection)); 605} 606if ($num_messages) { 607 $counters['status'] = $num_messages; 608 if (empty($reload)) { 609 processQueueOutput($GLOBALS['I18N']->get('Processing has started,')); 610 if ($num_messages == 1) { 611 processQueueOutput(s('One campaign to process.')); 612 } else { 613 processQueueOutput(s('%d campaigns to process.', $num_messages)); 614 } 615 } 616 clearPageCache(); 617 if (!$GLOBALS['commandline'] && empty($reload)) { 618 processQueueOutput(s('Please leave this window open.').' '.s('phpList will process your queue until all messages have been sent.').' '.s('This may take a while')); 619 if (SEND_QUEUE_PROCESSING_REPORT) { 620 processQueueOutput(s('Report of processing will be sent by email')); 621 } 622 } 623} else { 624 //# check for a future embargo, to be able to report when it expires. 625 $future = Sql_Fetch_Assoc_Query('select unix_timestamp(embargo) - unix_timestamp(now()) as waittime ' 626 ." from ${tables['message']}" 627 ." where status not in ('draft', 'sent', 'prepared', 'suspended')" 628 .' and embargo > now()' 629 .' order by embargo asc limit 1'); 630 if ($future) { 631 $counters['status'] = 'embargo'; 632 $counters['delaysend'] = $future['waittime']; 633 } 634} 635 636$script_stage = 2; // we know the messages to process 637//include_once "footer.inc"; 638if (empty($counters['num_per_batch'])) { 639 $counters['num_per_batch'] = 10000000; 640} 641 642while ($message = Sql_fetch_array($messages)) { 643 ++$counters['campaign']; 644 $throttlecount = 0; 645 646 $messageid = $message['id']; 647 $counters['sent_users_for_message '.$messageid] = 0; 648 $counters['total_users_for_message '.$messageid] = 0; 649 if (PROCESSCAMPAIGNS_PARALLEL) { 650 $counters['max_users_for_message '.$messageid] = (int) $counters['num_per_batch'] / $num_messages; //# not entirely correct if a campaign has less left 651 if (VERBOSE) { 652 cl_output(s('Maximum for campaign %d is %d', $messageid, $counters['max_users_for_message '.$messageid])); 653 } 654 } else { 655 $counters['max_users_for_message '.$messageid] = 0; 656 } 657 658 $counters['processed_users_for_message '.$messageid] = 0; 659 $counters['failed_sent_for_message '.$messageid] = 0; 660 661 if (!empty($getspeedstats)) { 662 processQueueOutput('start send '.$messageid); 663 } 664 665 $msgdata = loadMessageData($messageid); 666 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 667 $plugin->campaignStarted($msgdata); 668 } 669 670 if (!empty($msgdata['resetstats'])) { 671 resetMessageStatistics($msgdata['id']); 672 //# make sure to reset the resetstats flag, so it doesn't clear it every run 673 setMessageData($msgdata['id'], 'resetstats', 0); 674 } 675 676 //# check the end date of the campaign 677 $stopSending = false; 678 if (!empty($msgdata['finishsending'])) { 679 $finishSendingBefore = mktime($msgdata['finishsending']['hour'], $msgdata['finishsending']['minute'], 0, 680 $msgdata['finishsending']['month'], $msgdata['finishsending']['day'], $msgdata['finishsending']['year']); 681 $secondsTogo = $finishSendingBefore - time(); 682 $stopSending = $secondsTogo < 0; 683 if (empty($reload)) { 684 //## Hmm, this is probably incredibly confusing. It won't finish then 685 if (VERBOSE) { 686 processQueueOutput(sprintf($GLOBALS['I18N']->get('sending of this campaign will stop, if it is still going in %s'), 687 secs2time($secondsTogo))); 688 } 689 } 690 } 691 692 $userselection = $msgdata['userselection']; //# @@ needs more work 693 //# load message in cache 694 if (!precacheMessage($messageid)) { 695 //# precache may fail on eg invalid remote URL 696 //# any reporting needed here? 697 698 // mark the message as suspended 699 Sql_Query(sprintf('update %s set status = "suspended" where id = %d', $GLOBALS['tables']['message'], 700 $messageid)); 701 processQueueOutput(s('Error loading message, please check the eventlog for details')); 702 if (MANUALLY_PROCESS_QUEUE) { 703 // wait a little, otherwise the message won't show 704 sleep(10); 705 } 706 continue; 707 } 708 709 if (!empty($getspeedstats)) { 710 processQueueOutput('message data loaded '); 711 } 712 if (VERBOSE) { 713 // processQueueOutput($msgdata); 714 } 715 if (!empty($msgdata['notify_start']) && !isset($msgdata['start_notified'])) { 716 $notifications = explode(',', $msgdata['notify_start']); 717 foreach ($notifications as $notification) { 718 sendMail($notification, s('Campaign started'), 719 s('phplist has started sending the campaign with subject %s', $msgdata['subject'])."\n\n". 720 s('to view the progress of this campaign, go to %s://%s', $GLOBALS['admin_scheme'], 721 hostName().$GLOBALS['adminpages'].'/?page=messages&tab=active')); 722 } 723 Sql_Query(sprintf('insert ignore into %s (name,id,data) values("start_notified",%d,now())', 724 $GLOBALS['tables']['messagedata'], $messageid)); 725 } 726 727 if (empty($reload)) { 728 processQueueOutput($GLOBALS['I18N']->get('Processing message').' '.$messageid); 729 } 730 731 flush(); 732 keepLock($send_process_id); 733 $status = Sql_Query(sprintf('update %s set status = "inprocess" where id = %d', $tables['message'], $messageid)); 734 $sendstart = Sql_Query(sprintf('update %s set sendstart = now() where sendstart is null and id = %d', 735 $tables['message'], $messageid)); 736 if (empty($reload)) { 737 processQueueOutput($GLOBALS['I18N']->get('Looking for users')); 738 } 739 if (Sql_Has_Error($database_connection)) { 740 ProcessError(Sql_Error($database_connection)); 741 } 742 743 // make selection on attribute, users who at least apply to the attributes 744 // lots of ppl seem to use it as a normal mailinglist system, and do not use attributes. 745 // Check this and take anyone in that case. 746 747 //# keep an eye on how long it takes to find users, and warn if it's a long time 748 $findUserStart = $processqueue_timer->elapsed(1); 749 750 $rs = Sql_Query('select count(*) from '.$tables['attribute']); 751 $numattr = Sql_Fetch_Row($rs); 752 753 $user_attribute_query = ''; //16552 754 if ($userselection && $numattr[0]) { 755 $res = Sql_Query($userselection); 756 $counters['total_users_for_message'] = Sql_Num_Rows($res); 757 if (empty($reload)) { 758 processQueueOutput($counters['total_users_for_message'].' '.$GLOBALS['I18N']->get('users apply for attributes, now checking lists'), 759 0, 'progress'); 760 } 761 $user_list = ''; 762 while ($row = Sql_Fetch_row($res)) { 763 $user_list .= $row[0].','; 764 } 765 $user_list = substr($user_list, 0, -1); 766 if ($user_list) { 767 $user_attribute_query = " and listuser.userid in ($user_list)"; 768 } else { 769 if (empty($reload)) { 770 processQueueOutput($GLOBALS['I18N']->get('No users apply for attributes')); 771 } 772 $status = Sql_Query(sprintf('update %s set status = "sent", sent = now() where id = %d', $tables['message'], 773 $messageid)); 774 finish('info', "Message $messageid: \nNo users apply for attributes, ie nothing to do"); 775 $script_stage = 6; 776 // we should actually continue with the next message 777 return; 778 } 779 } 780 if ($script_stage < 3) { 781 $script_stage = 3; // we know the users by attribute 782 } 783 784 // when using commandline we need to exclude users who have already received 785 // the email 786 // we don't do this otherwise because it slows down the process, possibly 787 // causing us to not find anything at all 788 $exclusion = ''; 789 $doneusers = array(); 790 $skipusers = array(); 791 792//# 8478, avoid building large array in memory, when sending large amounts of users. 793 794 /* 795 $req = Sql_Query("select userid from {$tables["usermessage"]} where messageid = $messageid"); 796 $skipped = Sql_Affected_Rows(); 797 if ($skipped < 10000) { 798 while ($row = Sql_Fetch_Row($req)) { 799 $alive = checkLock($send_process_id); 800 if ($alive) 801 keepLock($send_process_id); 802 else 803 ProcessError($GLOBALS['I18N']->get('Process Killed by other process')); 804 array_push($doneusers,$row[0]); 805 } 806 } else { 807 processQueueOutput($GLOBALS['I18N']->get('Warning, disabling exclusion of done users, too many found')); 808 logEvent($GLOBALS['I18N']->get('Warning, disabling exclusion of done users, too many found')); 809 } 810 811 # also exclude unconfirmed users, otherwise they'll block the process 812 # will give quite different statistics than when used web based 813 # $req = Sql_Query("select id from {$tables["user"]} where !confirmed"); 814 # while ($row = Sql_Fetch_Row($req)) { 815 # array_push($doneusers,$row[0]); 816 # } 817 if (sizeof($doneusers)) 818 $exclusion = " and listuser.userid not in (".join(",",$doneusers).")"; 819 */ 820 821 if (USE_LIST_EXCLUDE) { 822 if (VERBOSE) { 823 processQueueOutput($GLOBALS['I18N']->get('looking for users who can be excluded from this mailing')); 824 } 825 if (count($msgdata['excludelist'])) { 826 $query 827 = ' select userid' 828 .' from '.$GLOBALS['tables']['listuser'] 829 .' where listid in ('.implode(',', $msgdata['excludelist']).')'; 830 if (VERBOSE) { 831 processQueueOutput('Exclude query '.$query); 832 } 833 $req = Sql_Query($query); 834 while ($row = Sql_Fetch_Row($req)) { 835 $um = Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"excluded")', 836 $tables['usermessage'], $row[0], $messageid)); 837 } 838 } 839 } 840 841 /* 842 ## 8478 843 $query = sprintf('select distinct user.id from 844 %s as listuser, 845 %s as user, 846 %s as listmessage 847 where 848 listmessage.messageid = %d and 849 listmessage.listid = listuser.listid and 850 user.id = listuser.userid %s %s %s', 851 $tables['listuser'],$tables["user"],$tables['listmessage'], 852 $messageid, 853 $userconfirmed, 854 $exclusion, 855 $user_attribute_query);*/ 856 $queued = 0; 857 if (defined('MESSAGEQUEUE_PREPARE') && MESSAGEQUEUE_PREPARE) { 858 $query = sprintf('select userid from '.$tables['usermessage'].' where messageid = %d and status = "todo"', 859 $messageid); 860 $queued_count = Sql_Query($query); 861 $queued = Sql_Affected_Rows(); 862 // if (VERBOSE) { 863 cl_output('found pre-queued subscribers '.$queued, 0, 'progress'); 864 // } 865 } 866 867 //# if the above didn't find any, run the normal search (again) 868 if (empty($queued)) { 869 //# remove pre-queued messages, otherwise they wouldn't go out 870 Sql_Query(sprintf('delete from '.$tables['usermessage'].' where messageid = %d and status = "todo"', 871 $messageid)); 872 $removed = Sql_Affected_Rows(); 873 if ($removed) { 874 cl_output('removed pre-queued subscribers '.$removed, 0, 'progress'); 875 } 876 877 $query = sprintf('select distinct u.id from %s as listuser 878 inner join %s as u ON u.id = listuser.userid 879 inner join %s as listmessage ON listuser.listid = listmessage.listid 880 left join %s as um ON (um.messageid = %d and um.userid = listuser.userid) 881 where 882 listmessage.messageid = %d 883 and listmessage.listid = listuser.listid 884 and u.id = listuser.userid 885 and um.userid IS NULL 886 and u.confirmed and !u.blacklisted and !u.disabled 887 %s %s', 888 $tables['listuser'], 889 $tables['user'], 890 $tables['listmessage'], 891 $tables['usermessage'], 892 $messageid, $messageid, 893 $exclusion, $user_attribute_query 894 ); 895 } 896 897 if (VERBOSE) { 898 processQueueOutput('User select query '.$query); 899 } 900 901 $userids = Sql_Query($query); 902 if (Sql_Has_Error($database_connection)) { 903 ProcessError(Sql_Error($database_connection)); 904 } 905 906 // now we have all our users to send the message to 907 $counters['total_users_for_message '.$messageid] = Sql_Affected_Rows(); 908 909 if ($skipped >= 10000) { 910 $counters['total_users_for_message '.$messageid] -= $skipped; 911 } 912 913 $findUserEnd = $processqueue_timer->elapsed(1); 914 915 if ($findUserEnd - $findUserStart > 300 && !$GLOBALS['commandline']) { 916 processQueueOutput($GLOBALS['I18N']->get('Warning, finding the subscribers to send out to takes a long time, consider changing to commandline sending')); 917 } 918 919 if (empty($reload)) { 920 processQueueOutput($GLOBALS['I18N']->get('Found them').': '.$counters['total_users_for_message '.$messageid].' '.$GLOBALS['I18N']->get('to process')); 921 } 922 setMessageData($messageid, 'to process', $counters['total_users_for_message '.$messageid]); 923 924 if (defined('MESSAGEQUEUE_PREPARE') && MESSAGEQUEUE_PREPARE && empty($queued)) { 925 //# experimental MESSAGEQUEUE_PREPARE will first mark all messages as todo and then work it's way through the todo's 926 //# that should save time when running the queue multiple times, which avoids the user search after the first time 927 //# only do this first time, ie empty($queued); 928 //# the last run will pick up changes 929 while ($userdata = Sql_Fetch_Row($userids)) { 930 //# mark message/user combination as "todo" 931 $userid = $userdata[0]; // id of the user 932 Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"todo")', 933 $tables['usermessage'], $userid, $messageid)); 934 } 935 //# rerun the initial query, in order to continue as normal 936 $query = sprintf('select userid from '.$tables['usermessage'].' where messageid = %d and status = "todo"', 937 $messageid); 938 $userids = Sql_Query($query); 939 $counters['total_users_for_message '.$messageid] = Sql_Affected_Rows(); 940 } 941 942 while ($userdata = Sql_Fetch_Row($userids)) { 943 $userid = $userdata[0]; // id of the user 944 945 /* 946 * when parallel processing stop when the number sent for this message reaches the limit 947 */ 948 if ($counters['max_users_for_message '.$messageid] 949 && $counters['sent_users_for_message '.$messageid] >= $counters['max_users_for_message '.$messageid] 950 ) { 951 if (VERBOSE) { 952 cl_output(s('Limit for this campaign reached: %d (%d)', 953 $counters['sent_users_for_message '.$messageid], 954 $counters['max_users_for_message '.$messageid])); 955 } 956 break; 957 } 958 /* 959 * when batch processing stop when the number sent reaches the batch limit 960 */ 961 if ($counters['num_per_batch'] && $counters['sent'] >= $counters['num_per_batch']) { 962 processQueueOutput(s('batch limit reached').': '.$counters['sent'].' ('.$counters['num_per_batch'].')', 963 1, 'progress'); 964 $GLOBALS['wait'] = $batch_period; 965 966 return; 967 } 968 $failure_reason = ''; 969 970 if (!empty($getspeedstats)) { 971 processQueueOutput('-----------------------------------'."\n".'start process user '.$userid); 972 } 973 $some = 1; 974 set_time_limit(120); 975 976 $secondsTogo = $finishSendingBefore - time(); 977 $stopSending = $secondsTogo < 0; 978 979 // check if we have been "killed" 980 // processQueueOutput('Process ID '.$send_process_id); 981 $alive = checkLock($send_process_id); 982 983 //# check for max-process-queue-time 984 $elapsed = $GLOBALS['processqueue_timer']->elapsed(1); 985 if ($maxProcessQueueTime && $elapsed > $maxProcessQueueTime && $counters['sent'] > 0) { 986 cl_output($GLOBALS['I18N']->get('queue processing time has exceeded max processing time ').$maxProcessQueueTime); 987 break; 988 } elseif ($alive && !$stopSending) { 989 keepLock($send_process_id); 990 } elseif ($stopSending) { 991 processQueueOutput($GLOBALS['I18N']->get('Campaign sending timed out, is past date to process until')); 992 break; 993 } else { 994 ProcessError($GLOBALS['I18N']->get('Process Killed by other process')); 995 } 996 997 // check if the message we are working on is still there and in process 998 $status = Sql_Fetch_Array_query("select id,status from {$tables['message']} where id = $messageid"); 999 if (!$status['id']) { 1000 ProcessError($GLOBALS['I18N']->get('Message I was working on has disappeared')); 1001 } elseif ($status['status'] != 'inprocess') { 1002 $script_stage = 6; 1003 ProcessError($GLOBALS['I18N']->get('Sending of this message has been suspended')); 1004 } 1005 flush(); 1006 1007 //# 1008 //Sql_Query(sprintf('delete from %s where userid = %d and messageid = %d and status = "active"',$tables['usermessage'],$userid,$messageid)); 1009 1010 // check whether the user has already received the message 1011 if (!empty($getspeedstats)) { 1012 processQueueOutput('verify message can go out to '.$userid); 1013 } 1014 1015 $um = Sql_Query(sprintf('select entered from %s where userid = %d and messageid = %d and status != "todo"', 1016 $tables['usermessage'], $userid, $messageid)); 1017 if (!Sql_Num_Rows($um)) { 1018 //# mark this message that we're working on it, so that no other process will take it 1019 //# between two lines ago and here, should hopefully be quick enough 1020 $userlock = Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"active")', 1021 $tables['usermessage'], $userid, $messageid)); 1022 1023 if ($script_stage < 4) { 1024 $script_stage = 4; // we know a subscriber to send to 1025 } 1026 $someusers = 1; 1027 $users = Sql_query("select id,email,uniqid,htmlemail,confirmed,blacklisted,disabled from {$tables['user']} where id = $userid"); 1028 1029 // pick the first one (rather historical from before email was unique) 1030 $user = Sql_fetch_Assoc($users); 1031 if ($user['confirmed'] && is_email($user['email'])) { 1032 $userid = $user['id']; // id of the subscriber 1033 $useremail = $user['email']; // email of the subscriber 1034 $userhash = $user['uniqid']; // unique string of the user 1035 $htmlpref = $user['htmlemail']; // preference for HTML emails 1036 $confirmed = $user['confirmed'] && !$user['disabled']; //# 7 = disabled flag 1037 $blacklisted = $user['blacklisted']; 1038 $msgdata['counters'] = $counters; 1039 1040 $cansend = !$blacklisted && $confirmed; 1041 /* 1042 ## Ask plugins if they are ok with sending this message to this user 1043 */ 1044 if (!empty($getspeedstats)) { 1045 processQueueOutput('start check plugins '); 1046 } 1047 1048 reset($GLOBALS['plugins']); 1049 while ($cansend && $plugin = current($GLOBALS['plugins'])) { 1050 $cansend = $plugin->canSend($msgdata, $user); 1051 if (!$cansend) { 1052 $failure_reason .= 'Sending blocked by plugin '.$plugin->name; 1053 $counterIndex = 'send blocked by '.$plugin->name; 1054 if (!isset($counters[$counterIndex])) { 1055 $counters[$counterIndex] = 0; 1056 } 1057 ++$counters[$counterIndex]; 1058 if (VERBOSE) { 1059 cl_output('Sending blocked by plugin '.$plugin->name); 1060 } 1061 } 1062 1063 next($GLOBALS['plugins']); 1064 } 1065 if (!empty($getspeedstats)) { 1066 processQueueOutput('end check plugins '); 1067 } 1068 1069//################################### 1070// Throttling 1071 1072 $throttled = 0; 1073 if ($cansend && USE_DOMAIN_THROTTLE) { 1074 list($mailbox, $throttleDomain) = explode('@', $useremail); 1075 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 1076 if ($newThrottleDomain = $plugin->throttleDomainMap($throttleDomain)) { 1077 $throttleDomain = $newThrottleDomain; 1078 break; 1079 } 1080 } 1081 $now = time(); 1082 $interval = $now - ($now % DOMAIN_BATCH_PERIOD); 1083 if (!isset($domainthrottle[$throttleDomain]) || !is_array($domainthrottle[$throttleDomain])) { 1084 $domainthrottle[$throttleDomain] = array( 1085 'interval' => '', 1086 'sent' => 0, 1087 'attempted' => 0, 1088 ); 1089 } elseif (isset($domainthrottle[$throttleDomain]['interval']) && $domainthrottle[$throttleDomain]['interval'] == $interval) { 1090 $throttled = $domainthrottle[$throttleDomain]['sent'] >= DOMAIN_BATCH_SIZE; 1091 if ($throttled) { 1092 ++$counters['send blocked by domain throttle']; 1093 ++$domainthrottle[$throttleDomain]['attempted']; 1094 if (DOMAIN_AUTO_THROTTLE 1095 && $domainthrottle[$throttleDomain]['attempted'] > 25 // skip a few before auto throttling 1096 && $num_messages <= 1 // only do this when there's only one message to process otherwise the other ones don't get a chance 1097 && $counters['total_users_for_message '.$messageid] < 1000 // and also when there's not too many left, because then it's likely they're all being throttled 1098 ) { 1099 $domainthrottle[$throttleDomain]['attempted'] = 0; 1100 logEvent(sprintf($GLOBALS['I18N']->get('There have been more than 10 attempts to send to %s that have been blocked for domain throttling.'), 1101 $throttleDomain)); 1102 logEvent($GLOBALS['I18N']->get('Introducing extra delay to decrease throttle failures')); 1103 if (VERBOSE) { 1104 processQueueOutput($GLOBALS['I18N']->get('Introducing extra delay to decrease throttle failures')); 1105 } 1106 if (!isset($running_throttle_delay)) { 1107 $running_throttle_delay = (int) (MAILQUEUE_THROTTLE + (DOMAIN_BATCH_PERIOD / (DOMAIN_BATCH_SIZE * 4))); 1108 } else { 1109 $running_throttle_delay += (int) (DOMAIN_BATCH_PERIOD / (DOMAIN_BATCH_SIZE * 4)); 1110 } 1111 //processQueueOutput("Running throttle delay: ".$running_throttle_delay); 1112 } elseif (VERBOSE) { 1113 processQueueOutput(sprintf($GLOBALS['I18N']->get('%s is currently over throttle limit of %d per %d seconds').' ('.$domainthrottle[$throttleDomain]['sent'].')', 1114 $throttleDomain, DOMAIN_BATCH_SIZE, DOMAIN_BATCH_PERIOD)); 1115 } 1116 } 1117 } 1118 } 1119 1120 if ($cansend) { 1121 $success = 0; 1122 if (!TEST) { 1123 reset($GLOBALS['plugins']); 1124 while (!$throttled && $plugin = current($GLOBALS['plugins'])) { 1125 $throttled = $plugin->throttleSend($msgdata, $user); 1126 if ($throttled) { 1127 if (!isset($counters['send throttled by plugin '.$plugin->name])) { 1128 $counters['send throttled by plugin '.$plugin->name] = 0; 1129 } 1130 ++$counters['send throttled by plugin '.$plugin->name]; 1131 $failure_reason .= 'Sending throttled by plugin '.$plugin->name; 1132 } 1133 next($GLOBALS['plugins']); 1134 } 1135 if (!$throttled) { 1136 if (VERBOSE) { 1137 processQueueOutput($GLOBALS['I18N']->get('Sending').' '.$messageid.' '.$GLOBALS['I18N']->get('to').' '.$useremail); 1138 } 1139 $emailSentTimer = new timer(); 1140 ++$counters['batch_count']; 1141 $success = sendEmail($messageid, $useremail, $userhash, 1142 $htmlpref); // $rssitems Obsolete by rssmanager plugin 1143 if (!$success) { 1144 ++$counters['sendemail returned false total']; 1145 ++$counters['sendemail returned false']; 1146 } else { 1147 $counters['sendemail returned false'] = 0; 1148 } 1149 if ($counters['sendemail returned false'] > 10) { 1150 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 1151 $plugin->processError(s('Warning: a lot of errors while sending campaign %d', 1152 $messageid)); 1153 } 1154 } 1155 1156 if (VERBOSE) { 1157 processQueueOutput($GLOBALS['I18N']->get('It took').' '.$emailSentTimer->elapsed(1).' '.$GLOBALS['I18N']->get('seconds to send')); 1158 } 1159 } else { 1160 ++$throttlecount; 1161 } 1162 } else { 1163 $success = sendEmailTest($messageid, $useremail); 1164 ++$counters['sentastest']; 1165 ++$counters['batch_count']; 1166 setMessageData($messageid, 'sentastest', $counters['sentastest']); 1167 } 1168 1169 //############################ 1170 // tried to send email , process succes / failure 1171 if ($success) { 1172 if (USE_DOMAIN_THROTTLE) { 1173 if ($domainthrottle[$throttleDomain]['interval'] != $interval) { 1174 $domainthrottle[$throttleDomain]['interval'] = $interval; 1175 $domainthrottle[$throttleDomain]['sent'] = 1; 1176 } else { 1177 ++$domainthrottle[$throttleDomain]['sent']; 1178 } 1179 } 1180 ++$counters['sent']; 1181 ++$counters['sent_users_for_message '.$messageid]; 1182 $um = Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"sent")', 1183 $tables['usermessage'], $userid, $messageid)); 1184 } else { 1185 ++$counters['failed_sent']; 1186 ++$counters['failed_sent_for_message '.$messageid]; 1187 //# need to check this, the entry shouldn't be there in the first place, so no need to delete it 1188 //# might be a cause for duplicated emails 1189 if (defined('MESSAGEQUEUE_PREPARE') && MESSAGEQUEUE_PREPARE) { 1190 Sql_Query(sprintf('update %s set status = "todo" where userid = %d and messageid = %d and status = "active"', 1191 $tables['usermessage'], $userid, $messageid)); 1192 } else { 1193 Sql_Query(sprintf('delete from %s where userid = %d and messageid = %d and status = "active"', 1194 $tables['usermessage'], $userid, $messageid)); 1195 } 1196 if (VERBOSE) { 1197 processQueueOutput($GLOBALS['I18N']->get('Failed sending to').' '.$useremail); 1198 logEvent("Failed sending message $messageid to $useremail"); 1199 } 1200 // make sure it's not because it's an underdeliverable email 1201 // unconfirm this user, so they're not included next time 1202 if (!$throttled && !validateEmail($useremail)) { 1203 ++$unconfirmed; 1204 ++$counters['email address invalidated']; 1205 logEvent("invalid email address $useremail user marked unconfirmed"); 1206 Sql_Query(sprintf('update %s set confirmed = 0 where email = "%s"', 1207 $GLOBALS['tables']['user'], $useremail)); 1208 } 1209 } 1210 1211 if ($script_stage < 5) { 1212 $script_stage = 5; // we have actually sent one user 1213 } 1214 if (isset($running_throttle_delay)) { 1215 sleep($running_throttle_delay); 1216 if ($counters['sent'] % 5 == 0) { 1217 // retry running faster after some more messages, to see if that helps 1218 unset($running_throttle_delay); 1219 } 1220 } elseif (MAILQUEUE_THROTTLE) { 1221 usleep(MAILQUEUE_THROTTLE * 1000000); 1222 } elseif (MAILQUEUE_BATCH_SIZE && MAILQUEUE_AUTOTHROTTLE) { 1223 $totaltime = $GLOBALS['processqueue_timer']->elapsed(1); 1224 $msgperhour = (3600 / $totaltime) * $counters['sent']; 1225 $msgpersec = $msgperhour / 3600; 1226 1227 //#11336 - this may cause "division by 0", but 'secpermsg' isn't used at all 1228 // $secpermsg = $totaltime / $counters['sent']; 1229 $target = (MAILQUEUE_BATCH_PERIOD / MAILQUEUE_BATCH_SIZE) * $counters['sent']; 1230 $delay = $target - $totaltime; 1231 1232 if ($delay > 0) { 1233 if (VERBOSE) { 1234 /* processQueueOutput($GLOBALS['I18N']->get('waiting for').' '.$delay.' '.$GLOBALS['I18N']->get('seconds').' '. 1235 $GLOBALS['I18N']->get('to make sure we don\'t exceed our limit of ').MAILQUEUE_BATCH_SIZE.' '. 1236 $GLOBALS['I18N']->get('messages in ').' '.MAILQUEUE_BATCH_PERIOD.$GLOBALS['I18N']->get('seconds')); */ 1237 processQueueOutput(sprintf($GLOBALS['I18N']->get('waiting for %.1f seconds to meet target of %s seconds per message'), 1238 $delay, (MAILQUEUE_BATCH_PERIOD / MAILQUEUE_BATCH_SIZE)) 1239 ); 1240 } 1241 usleep($delay * 1000000); 1242 } 1243 } 1244 } else { 1245 ++$cannotsend; 1246 // mark it as sent anyway, because otherwise the process will never finish 1247 if (VERBOSE) { 1248 processQueueOutput($GLOBALS['I18N']->get('not sending to ').$useremail); 1249 } 1250 $um = Sql_query("replace into {$tables['usermessage']} (entered,userid,messageid,status) values(now(),$userid,$messageid,\"not sent\")"); 1251 } 1252 1253 // update possible other users matching this email as well, 1254 // to avoid duplicate sending when people have subscribed multiple times 1255 // bit of legacy code after making email unique in the database 1256 // $emails = Sql_query("select * from {$tables['user']} where email =\"$useremail\""); 1257 // while ($email = Sql_fetch_row($emails)) 1258 // Sql_query("replace into {$tables['usermessage']} (userid,messageid) values($email[0],$messageid)"); 1259 } else { 1260 // some "invalid emails" are entirely empty, ah, that is because they are unconfirmed 1261 1262 //# this is quite old as well, with the preselection that avoids unconfirmed users 1263 // it is unlikely this is every processed. 1264 1265 if (!$user['confirmed'] || $user['disabled']) { 1266 if (VERBOSE) { 1267 processQueueOutput($GLOBALS['I18N']->get('Unconfirmed user').': '.$userid.' '.$user['email'].' '.$user['id']); 1268 } 1269 ++$unconfirmed; 1270 // when running from commandline we mark it as sent, otherwise we might get 1271 // stuck when using batch processing 1272 // if ($GLOBALS["commandline"]) { 1273 $um = Sql_query("replace into {$tables['usermessage']} (entered,userid,messageid,status) values(now(),$userid,$messageid,\"unconfirmed user\")"); 1274 // } 1275 } elseif ($user['email'] || $user['id']) { 1276 if (VERBOSE) { 1277 processQueueOutput(s('Invalid email address').': '.$user['email'].' '.$user['id']); 1278 } 1279 logEvent(s('Invalid email address').': userid '.$user['id'].' email '.$user['email']); 1280 // mark it as sent anyway 1281 if ($user['id']) { 1282 $um = Sql_query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"invalid email address")', 1283 $tables['usermessage'], $userid, $messageid)); 1284 Sql_Query(sprintf('update %s set confirmed = 0 where id = %d', 1285 $GLOBALS['tables']['user'], $user['id'])); 1286 addUserHistory( 1287 $user['email'], 1288 s('Subscriber marked unconfirmed for invalid email address'), 1289 s('Marked unconfirmed while sending campaign %d', $messageid) 1290 ); 1291 } 1292 ++$counters['invalid']; 1293 } 1294 } 1295 } else { 1296 1297 //# and this is quite historical, and also unlikely to be every called 1298 // because we now exclude users who have received the message from the 1299 // query to find users to send to 1300 1301 //# when trying to send the message, it was already marked for this user 1302 //# June 2010, with the multiple send process extension, that's quite possible to happen again 1303 1304 $um = Sql_Fetch_Row($um); 1305 ++$notsent; 1306 if (VERBOSE) { 1307 processQueueOutput($GLOBALS['I18N']->get('Not sending to').' '.$userid.', '.$GLOBALS['I18N']->get('already sent').' '.$um[0]); 1308 } 1309 } 1310 $status = Sql_query("update {$tables['message']} set processed = processed + 1 % 16000000 where id = $messageid"); 1311 $processed = $notsent + $counters['sent'] + $counters['invalid'] + $unconfirmed + $cannotsend + $counters['failed_sent']; 1312 //if ($processed % 10 == 0) { 1313 if (0) { 1314 processQueueOutput('AR'.$affrows.' N '.$counters['total_users_for_message '.$messageid].' P'.$processed.' S'.$counters['sent'].' N'.$notsent.' I'.$counters['invalid'].' U'.$unconfirmed.' C'.$cannotsend.' F'.$counters['failed_sent']); 1315 $rn = $reload * $counters['num_per_batch']; 1316 processQueueOutput('P '.$processed.' N'.$counters['total_users_for_message '.$messageid].' NB'.$counters['num_per_batch'].' BT'.$batch_total.' R'.$reload.' RN'.$rn); 1317 } 1318 /* 1319 * don't calculate this here, but in the "msgstatus" instead, so that 1320 * the total speed can be calculated, eg when there are multiple send processes 1321 * 1322 * re-added for commandline outputting 1323 */ 1324 1325 $totaltime = $GLOBALS['processqueue_timer']->elapsed(1); 1326 if ($counters['sent'] > 0) { 1327 $msgperhour = (3600 / $totaltime) * $counters['sent']; 1328 $secpermsg = $totaltime / $counters['sent']; 1329 $timeleft = ($counters['total_users_for_message '.$messageid] - $counters['sent']) * $secpermsg; 1330 $eta = date('D j M H:i', time() + $timeleft); 1331 } else { 1332 $msgperhour = 0; 1333 $secpermsg = 0; 1334 $timeleft = 0; 1335 $eta = $GLOBALS['I18N']->get('unknown'); 1336 } 1337 ++$counters['processed_users_for_message '.$messageid]; 1338 setMessageData($messageid, 'ETA', $eta); 1339 setMessageData($messageid, 'msg/hr', "$msgperhour"); 1340 1341 cl_progress('sent '.$counters['sent'].' ETA '.$eta.' sending '.sprintf('%d', 1342 $msgperhour).' msg/hr'); 1343 1344 setMessageData($messageid, 'to process', 1345 $counters['total_users_for_message '.$messageid] - $counters['processed_users_for_message '.$messageid]); 1346 setMessageData($messageid, 'last msg sent', time()); 1347 // setMessageData($messageid,'totaltime',$GLOBALS['processqueue_timer']->elapsed(1)); 1348 if (!empty($getspeedstats)) { 1349 processQueueOutput('end process user '."\n".'-----------------------------------'."\n".$userid); 1350 } 1351 } 1352 $processed = $notsent + $counters['sent'] + $counters['invalid'] + $unconfirmed + $cannotsend + $counters['failed_sent']; 1353 processQueueOutput(s('Processed %d out of %d subscribers', $counters['processed_users_for_message '.$messageid], 1354 $counters['total_users_for_message '.$messageid]), 1, 'progress'); 1355 1356 if ($counters['total_users_for_message '.$messageid] - $counters['processed_users_for_message '.$messageid] <= 0 || $stopSending) { 1357 // this message is done 1358 if (!$someusers) { 1359 processQueueOutput($GLOBALS['I18N']->get('Hmmm, No users found to send to'), 1, 'progress'); 1360 } 1361 if (!$counters['failed_sent']) { 1362 repeatMessage($messageid); 1363 foreach ($GLOBALS['plugins'] as $pluginname => $plugin) { 1364 $plugin->processSendingCampaignFinished($messageid, $msgdata); 1365 } 1366 $status = Sql_query(sprintf('update %s set status = "sent",sent = now() where id = %d', 1367 $GLOBALS['tables']['message'], $messageid)); 1368 1369 if (!empty($msgdata['notify_end']) && !isset($msgdata['end_notified'])) { 1370 $notifications = explode(',', $msgdata['notify_end']); 1371 foreach ($notifications as $notification) { 1372 sendMail($notification, $GLOBALS['I18N']->get('Message campaign finished'), 1373 s('phpList has finished sending the campaign with subject %s', $msgdata['subject'])."\n\n". 1374 s('to view the statistics of this campaign, go to %s://%s', $GLOBALS['admin_scheme'], 1375 getConfig('website').$GLOBALS['adminpages'].'/?page=statsoverview&id='.$messageid) 1376 ); 1377 } 1378 Sql_Query(sprintf('insert ignore into %s (name,id,data) values("end_notified",%d,now())', 1379 $GLOBALS['tables']['messagedata'], $messageid)); 1380 } 1381 $rs = Sql_Query(sprintf('select sent, sendstart from %s where id = %d', $tables['message'], $messageid)); 1382 $timetaken = Sql_Fetch_Row($rs); 1383 processQueueOutput($GLOBALS['I18N']->get('It took').' '.timeDiff($timetaken[0], 1384 $timetaken[1]).' '.$GLOBALS['I18N']->get('to send this message')); 1385 sendMessageStats($messageid); 1386 } 1387 //# flush cached message track stats to the DB 1388 if (isset($GLOBALS['cached']['linktracksent'])) { 1389 flushClicktrackCache(); 1390 // we're done with $messageid, so get rid of the cache 1391 unset($GLOBALS['cached']['linktracksent'][$messageid]); 1392 } 1393 } else { 1394 if ($script_stage < 5) { 1395 $script_stage = 5; 1396 } 1397 } 1398} 1399 1400if (!$num_messages) { 1401 $script_stage = 6; 1402} // we are done 1403# shutdown will take care of reporting 1404