1<?php
2
3//# temporarily remove this check, to make sure processing the queue with a remote call continues to work
4//# https://mantis.phplist.com/view.php?id=17316
5//verifyCsrfGetToken();
6
7if (isset($_GET['login']) || isset($_GET['password'])) {
8    echo Error(s('Remote processing of the queue is now handled with a processing secret'));
9
10    return;
11}
12
13if ($inRemoteCall) {
14    // check that we actually still want remote queue processing
15    $pqChoice = getConfig('pqchoice');
16    if (SHOW_PQCHOICE && $pqChoice != 'phplistdotcom') {
17        $counters['campaigns'] = 0;
18        echo outputCounters();
19        exit;
20    }
21} else {
22    // we're in a normal session, so the csrf token should work
23    verifyCsrfGetToken();
24}
25
26require_once dirname(__FILE__).'/../accesscheck.php';
27require_once dirname(__FILE__).'/../sendemaillib.php';
28
29$status = 'OK';
30$processqueue_timer = new timer();
31$domainthrottle = array();
32// check for other processes running
33
34if ((!empty($GLOBALS['commandline']) && isset($cline['f'])) || $inRemoteCall) {
35    // force set, so kill other processes
36    cl_output('Force set, killing other send processes');
37    $send_process_id = getPageLock(1);
38} else {
39    $send_process_id = getPageLock();
40}
41if (empty($send_process_id)) {
42    processQueueOutput(s('Unable get lock for processing'));
43    $status = s('Error processing');
44
45    return;
46}
47
48$mm = inMaintenanceMode();
49if (!empty($mm)) {
50    processQueueOutput(s('The system is in maintenance mode, stopping. Try again later.'));
51    $status = s('In maintenance mode, try again later.');
52    releaseLock($send_process_id);
53    return;
54}
55
56//cl_output('page locked on '.$send_process_id);
57
58if (empty($GLOBALS['commandline']) && isset($_GET['reload'])) {
59    $reload = sprintf('%d', $_GET['reload']);
60} else {
61    $reload = 0;
62}
63
64//# this one sends a notification to plugins that processing has started
65foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
66    $plugin->processQueueStart();
67}
68
69//# let's make sure all subscribers have a uniqid
70//# only when on CL
71if ($GLOBALS['commandline']) {
72    $req = Sql_Query(sprintf('select id from %s where uniqid is NULL or uniqid = ""', $GLOBALS['tables']['user']));
73    $num = Sql_Affected_Rows();
74    if ($num) {
75        cl_output(s('Giving a Unique ID to %d subscribers, this may take a while', $num));
76        while ($row = Sql_Fetch_Row($req)) {
77            Sql_query(sprintf('update %s set uniqid = "%s" where id = %d', $GLOBALS['tables']['user'], getUniqID(), $row[0]));
78        }
79    }
80}
81// make sure subscribers have a UUID. They may not when created via eg the API
82$req = Sql_Query(sprintf('select id from %s where uuid is NULL or uuid = ""', $GLOBALS['tables']['user']));
83$num = Sql_Affected_Rows();
84if ($num) {
85    cl_output(s('Giving a UUID to %d subscribers, this may take a while', $num));
86    processQueueOutput(s('Giving a UUID to %d subscribers, this may take a while', $num));
87    while ($row = Sql_Fetch_Row($req)) {
88        Sql_query(sprintf('update %s set uuid = "%s" where id = %d', $GLOBALS['tables']['user'], (string) uuid::generate(4), $row[0]));
89    }
90}
91// make sure campaigns have a UUID. They may not when created via eg the API
92$req = Sql_Query(sprintf('select id from %s where uuid is NULL or uuid = ""', $GLOBALS['tables']['message']));
93$num = Sql_Affected_Rows();
94if ($num) {
95    cl_output(s('Giving a UUID to %d campaigns', $num));
96    while ($row = Sql_Fetch_Row($req)) {
97        Sql_query(sprintf('update %s set uuid = "%s" where id = %d', $GLOBALS['tables']['message'], (string) uuid::generate(4), $row[0]));
98    }
99}
100
101// keep for now, just in case
102if (!Sql_Table_exists($GLOBALS['tables']['user_message_view'])) {
103    cl_output(s('Creating new table "user_message_view"'));
104    createTable('user_message_view');
105}
106
107$counters['num_per_batch'] = 0;
108$batch_period = 0;
109$script_stage = 0; // start
110$someusers = $skipped = 0;
111
112$maxbatch = -1;
113$minbatchperiod = -1;
114// check for batch limits
115$ISPrestrictions = '';
116$ISPlockfile = '';
117//$rssitems = array(); //Obsolete by rssmanager plugin
118$user_attribute_query = '';
119$lastsent = !empty($_GET['lastsent']) ? sprintf('%d', $_GET['lastsent']) : 0;
120$lastskipped = !empty($_GET['lastskipped']) ? sprintf('%d', $_GET['lastskipped']) : 0;
121
122if ($fp = @fopen('/etc/phplist.conf', 'r')) {
123    $contents = fread($fp, filesize('/etc/phplist.conf'));
124    fclose($fp);
125    $lines = explode("\n", $contents);
126    $ISPrestrictions = $GLOBALS['I18N']->get('The following restrictions have been set by your ISP:')."\n";
127    foreach ($lines as $line) {
128        list($key, $val) = explode('=', $line);
129
130        switch ($key) {
131            case 'maxbatch':
132                $maxbatch = sprintf('%d', $val);
133                $ISPrestrictions .= "$key = $val\n";
134                break;
135            case 'minbatchperiod':
136                $minbatchperiod = sprintf('%d', $val);
137                $ISPrestrictions .= "$key = $val\n";
138                break;
139            case 'lockfile':
140                $ISPlockfile = $val;
141        }
142    }
143}
144if (MAILQUEUE_BATCH_SIZE) {
145    if ($maxbatch > 0) {
146        $counters['num_per_batch'] = min(MAILQUEUE_BATCH_SIZE, $maxbatch);
147    } else {
148        $counters['num_per_batch'] = sprintf('%d', MAILQUEUE_BATCH_SIZE);
149    }
150    if (MAILQUEUE_BATCH_PERIOD) {
151        if ($minbatchperiod > 0) {
152            $batch_period = max(MAILQUEUE_BATCH_PERIOD, $minbatchperiod);
153        } else {
154            $batch_period = MAILQUEUE_BATCH_PERIOD;
155        }
156    }
157} else {
158    if ($maxbatch > 0) {
159        $counters['num_per_batch'] = $maxbatch;
160    }
161}
162
163//# force batch processing in small batches when called from the web interface
164/*
165 * bad idea, we shouldn't touch the batch settings, in case they are very specific for
166 * ISP restrictions, instead limit webpage processing by time (below)
167 *
168if (empty($GLOBALS['commandline'])) {
169  $counters['num_per_batch'] = min($counters['num_per_batch'],100);
170  $batch_period = max($batch_period,1);
171} elseif (isset($cline['m'])) {
172  $cl_num_per_batch = sprintf('%d',$cline['m']);
173  ## don't block when the param is not a number
174  if (!empty($cl_num_per_batch)) {
175    $counters['num_per_batch'] = $cl_num_per_batch;
176  }
177  cl_output("Batch set with commandline to ".$counters['num_per_batch']);
178}
179*/
180$maxProcessQueueTime = 0;
181if (defined('MAX_PROCESSQUEUE_TIME') && MAX_PROCESSQUEUE_TIME > 0) {
182    $maxProcessQueueTime = (int) MAX_PROCESSQUEUE_TIME;
183}
184// in-page processing force to a minute max, and make sure there's a batch size
185if (empty($GLOBALS['commandline'])) {
186    $maxProcessQueueTime = min($maxProcessQueueTime, 60);
187    if ($counters['num_per_batch'] <= 0) {
188        $counters['num_per_batch'] = 10000;
189    }
190}
191
192if (VERBOSE && $maxProcessQueueTime) {
193    processQueueOutput(s('Maximum time for queue processing').': '.$maxProcessQueueTime, 'progress');
194}
195
196if (isset($cline['m'])) {
197    cl_output('Max to send is '.$cline['m'].' num per batch is '.$counters['num_per_batch']);
198    $clinemax = (int) $cline['m'];
199    //# slow down just before max
200    if ($clinemax < 20) {
201        $counters['num_per_batch'] = min(2, $clinemax, $counters['num_per_batch']);
202    } elseif ($clinemax < 200) {
203        $counters['num_per_batch'] = min(20, $clinemax, $counters['num_per_batch']);
204    } else {
205        $counters['num_per_batch'] = min($clinemax, $counters['num_per_batch']);
206    }
207    cl_output('Max to send is '.$cline['m'].' setting num per batch to '.$counters['num_per_batch']);
208}
209
210$original_num_per_batch = $counters['num_per_batch'];
211if ($counters['num_per_batch'] && $batch_period) {
212    // check how many were sent in the last batch period and take off that
213    // amount from this batch
214    /*
215      processQueueOutput(sprintf('select count(*) from %s where entered > date_sub(now(),interval %d second) and status = "sent"',
216        $tables["usermessage"],$batch_period));
217    */
218    $recently_sent = Sql_Fetch_Row_Query(sprintf('select count(*) from %s where entered > date_sub(now(),interval %d second) and status = "sent"',
219        $tables['usermessage'], $batch_period));
220    cl_output('Recently sent : '.$recently_sent[0]);
221    $counters['num_per_batch'] -= $recently_sent[0];
222
223    // if this ends up being 0 or less, don't send anything at all
224    if ($counters['num_per_batch'] == 0) {
225        $counters['num_per_batch'] = -1;
226    }
227}
228// output some stuff to make sure it's not buffered in the browser
229for ($i = 0; $i < 10000; ++$i) {
230    echo '  ';
231    if ($i % 100 == 0) {
232        echo "\n";
233    }
234}
235echo '<style type="text/css" src="css/app.css"></style>';
236echo '<style type="text/css" src="ui/'.$GLOBALS['ui'].'/css/style.css"></style>';
237echo '<script type="text/javascript" src="js/'.$GLOBALS['jQuery'].'"></script>';
238//# not sure this works, but would be nice
239echo '<script type="text/javascript">$("#favicon").attr("href","images/busy.gif");</script>';
240
241flush();
242// report keeps track of what is going on
243$report = '';
244$nothingtodo = 0;
245$cached = array(); // cache the message from the database to avoid reloading it every time
246
247function my_shutdown()
248{
249    global $script_stage, $reload;
250//  processQueueOutput( "Script status: ".connection_status(),0); # with PHP 4.2.1 buggy. http://bugs.php.net/bug.php?id=17774
251    processQueueOutput(s('Script stage').': '.$script_stage, 0, 'progress');
252    global $counters, $report, $send_process_id, $tables, $nothingtodo, $processed, $notsent, $unconfirmed, $batch_period;
253    $some = $processed;
254    $delaytime = 0;
255    if (!$some) {
256        processQueueOutput($GLOBALS['I18N']->get('Finished, Nothing to do'), 0, 'progress');
257        $nothingtodo = 1;
258    }
259
260    $totaltime = $GLOBALS['processqueue_timer']->elapsed(1);
261    if ($totaltime > 0) {
262        $msgperhour = (3600 / $totaltime) * $counters['sent'];
263    } else {
264        $msgperhour = s('Calculating');
265    }
266    if ($counters['sent']) {
267        processQueueOutput(sprintf('%d %s %01.2f %s (%d %s)', $counters['sent'],
268            $GLOBALS['I18N']->get('messages sent in'),
269            $totaltime, $GLOBALS['I18N']->get('seconds'), $msgperhour, $GLOBALS['I18N']->get('msgs/hr')),
270            $counters['sent'], 'progress');
271    }
272    if ($counters['invalid']) {
273        processQueueOutput(s('%d invalid email addresses', $counters['invalid']), 1, 'progress');
274    }
275    if ($counters['failed_sent']) {
276        processQueueOutput(s('%d failed (will retry later)', $counters['failed_sent']), 1, 'progress');
277        foreach ($counters as $label => $value) {
278            //  processQueueOutput(sprintf('%d %s',$value,$GLOBALS['I18N']->get($label)),1,'progress');
279            cl_output(sprintf('%d %s', $value, $GLOBALS['I18N']->get($label)));
280        }
281    }
282    if ($unconfirmed) {
283        processQueueOutput(sprintf($GLOBALS['I18N']->get('%d emails unconfirmed (not sent)'), $unconfirmed), 1,
284            'progress');
285    }
286
287    foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
288        $plugin->processSendStats($counters['sent'], $counters['invalid'], $counters['failed_sent'], $unconfirmed,
289            $counters);
290    }
291
292    flushClickTrackCache();
293    releaseLock($send_process_id);
294
295    finish('info', $report, $script_stage);
296    if ($script_stage < 5 && !$nothingtodo) {
297        processQueueOutput($GLOBALS['I18N']->get('Warning: script never reached stage 5')."\n".$GLOBALS['I18N']->get('This may be caused by a too slow or too busy server')." \n");
298    } elseif ($script_stage == 5 && (!$nothingtodo || isset($GLOBALS['wait']))) {
299        // if the script timed out in stage 5, reload the page to continue with the rest
300        ++$reload;
301        if (!$GLOBALS['commandline'] && $counters['num_per_batch'] && $batch_period) {
302            if ($counters['sent'] + 10 < $GLOBALS['original_num_per_batch']) {
303                processQueueOutput($GLOBALS['I18N']->get('Less than batch size were sent, so reloading imminently'), 1,
304                    'progress');
305                $counters['delaysend'] = 10;
306            } else {
307                $counters['delaysend'] = (int) ($batch_period - $totaltime);
308                $delaytime = 30; //# actually with the iframe we can reload fairly quickly
309                processQueueOutput(s('Waiting for %d seconds before reloading', $delaytime), 1, 'progress');
310            }
311        }
312        $counters['delaysend'] = (int) ($batch_period - $totaltime);
313        if (empty($GLOBALS['inRemoteCall']) && empty($GLOBALS['commandline'])) {
314            if (defined('JSLEEPMETHOD')) {
315                printf('<script type="text/javascript">
316                setTimeout(function() {
317                    document.location = "./?page=pageaction&action=processqueue&ajaxed=true&reload=%d&lastsent=%d&lastskipped=%d%s";
318                }, %d);
319                </script></body></html>', $reload, $counters['sent'], $notsent, addCsrfGetToken(), $delaytime*1000);
320            } else {
321                sleep($delaytime);
322                printf('<script type="text/javascript">
323                document.location = "./?page=pageaction&action=processqueue&ajaxed=true&reload=%d&lastsent=%d&lastskipped=%d%s";
324                </script></body></html>', $reload, $counters['sent'], $notsent, addCsrfGetToken());
325            }
326        }
327    } elseif ($script_stage == 6 || $nothingtodo) {
328        foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
329            $plugin->messageQueueFinished();
330        }
331        processQueueOutput($GLOBALS['I18N']->get('Finished, All done'), 0);
332        echo '<script type="text/javascript">
333      var parentJQuery = window.parent.jQuery;
334      window.parent.allDone("' .s('All done').'");
335      </script>';
336    } else {
337        processQueueOutput(s('Script finished, but not all messages have been sent yet.'));
338    }
339    if (!empty($GLOBALS['inRemoteCall'])) {
340        ob_end_clean();
341        echo outputCounters();
342        @ob_start();
343    }
344
345    if (empty($GLOBALS['inRemoteCall']) && empty($GLOBALS['commandline']) && empty($_GET['ajaxed'])) {
346        return;
347    } elseif (!empty($GLOBALS['inRemoteCall']) || !empty($GLOBALS['commandline'])) {
348        @ob_end_clean();
349    }
350    exit;
351}
352
353register_shutdown_function('my_shutdown');
354
355//# some general functions
356function finish($flag, $message, $script_stage)
357{
358    global $nothingtodo, $counters, $messageid;
359    if ($flag == 'error') {
360        $subject = s('Message queue processing errors');
361    } elseif ($flag == 'info') {
362        $subject = s('Message queue processing report');
363    }
364    if (!$nothingtodo && !$GLOBALS['inRemoteCall']) {
365        processQueueOutput(s('Finished this run'), 1, 'progress');
366        echo '<script type="text/javascript">
367      var parentJQuery = window.parent.jQuery;
368      parentJQuery("#progressmeter").updateSendProgress("' .$counters['sent'].','.$counters['total_users_for_message '.$messageid].'");
369      </script>';
370    }
371    if (!$GLOBALS['inRemoteCall'] && !TEST && !$nothingtodo && SEND_QUEUE_PROCESSING_REPORT) {
372        $reportSent = false;
373
374        // Execute plugin hooks for sending report
375        // @@TODO work out a way to deal with the order of processing the plugins
376        // as that can make a difference here.
377        foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
378            if (!$reportSent) {
379                $reportSent = $plugin->sendReport($subject, $message);
380            }
381        }
382
383        // If plugins have not sent the report, send it the default way
384        if (!$reportSent) {
385            $messageWithIntro = s('The following events occured while processing the message queue:')."\n".$message;
386            $messageWithIntroAndFooter = $messageWithIntro."\n\n".s('To stop receiving these reports read:').' https://resources.phplist.com/system/config/send_queue_processing_report'."\n\n";
387            sendReport($subject, $messageWithIntroAndFooter);
388        }
389    }
390}
391
392function ProcessError($message)
393{
394    global $report;
395    $report .= $message;
396    processQueueOutput("Error: $message");
397    exit;
398}
399
400function processQueueOutput($message, $logit = 1, $target = 'summary')
401{
402    global $report, $shadecount, $counters, $messageid;
403    if (isset($counters['total_users_for_message '.$messageid])) {
404        $total = $counters['total_users_for_message '.$messageid];
405    } else {
406        $total = 0;
407    }
408    if (!isset($shadecount)) {
409        $shadecount = 0;
410    }
411    if (is_array($message)) {
412        $tmp = '';
413        foreach ($message as $key => $val) {
414            $tmp .= $key.'='.$val.'; ';
415        }
416        $message = $tmp;
417    }
418    if (!empty($GLOBALS['commandline'])) {
419        cl_output(strip_tags($message).' ['.$GLOBALS['processqueue_timer']->interval(1).'] ('.$GLOBALS['pagestats']['number_of_queries'].')');
420        $infostring = '['.date('D j M Y H:i', time()).'] [CL]';
421    } elseif ($GLOBALS['inRemoteCall']) {
422        //# with a remote call we suppress output
423        @ob_end_clean();
424        $infostring = '';
425        $message = '';
426        @ob_start();
427
428        return;
429    } else {
430        $infostring = '['.date('D j M Y H:i', time()).'] ['.getClientIP().']';
431        //print "$infostring $message<br/>\n";
432        $lines = explode("\n", $message);
433        foreach ($lines as $line) {
434            $line = preg_replace('/"/', '\"', $line);
435
436            //# contribution in forums, http://forums.phplist.com/viewtopic.php?p=14648
437            //Replace the "&rsquo;" which is not replaced by html_decode
438            $line = preg_replace('/&rsquo;/', "'", $line);
439            //Decode HTML chars
440            $line = html_entity_decode($line, ENT_QUOTES, 'UTF-8');
441
442            echo "\n".'<div class="output shade'.$shadecount.'">'.$line.'</div>';
443            $line = str_replace("'", "\'", $line); // #16880 - avoid JS error
444            echo '<script type="text/javascript">
445      var parentJQuery = window.parent.jQuery;
446      parentJQuery("#processqueue' .$target.'").append(\'<div class="output shade'.$shadecount.'">'.$line.'</div>\');
447      parentJQuery("#processqueue' .$target.'").animate({scrollTop:100000}, "slow");
448      </script>';
449            $shadecount = !$shadecount;
450            for ($i = 0; $i < 10000; ++$i) {
451                echo '  ';
452                if ($i % 100 == 0) {
453                    echo "\n";
454                }
455            }
456        }
457        @ob_flush();
458        flush();
459    }
460
461    $report .= "\n$infostring $message";
462    if ($logit) {
463        logEvent($message);
464    }
465    flush();
466}
467
468function outputCounters()
469{
470    global $counters;
471    $result = '';
472    if (function_exists('json_encode')) { // only PHP5.2.0 and up
473        return json_encode($counters);
474    } else {
475        //# keep track of which php versions we need to continue to support
476        $counters['PHPVERSION'] = phpversion();
477        foreach ($counters as $key => $val) {
478            $result .= $key.'='.$val.';';
479        }
480
481        return $result;
482    }
483}
484
485function sendEmailTest($messageid, $email)
486{
487    global $report;
488    if (VERBOSE) {
489        processQueueOutput($GLOBALS['I18N']->get('(test)').' '.$GLOBALS['I18N']->get('Would have sent').' '.$messageid.$GLOBALS['I18N']->get('to').' '.$email);
490    } else {
491        $report .= "\n".$GLOBALS['I18N']->get('(test)').' '.$GLOBALS['I18N']->get('Would have sent').' '.$messageid.$GLOBALS['I18N']->get('to').' '.$email;
492    }
493    // fake a bit of a delay,
494    usleep(0.75 * 1000000);
495    // and say it was fine.
496    return true;
497}
498
499// we don not want to timeout or abort
500$abort = ignore_user_abort(1);
501set_time_limit(600);
502flush();
503
504if (empty($reload)) { //# only show on first load
505    processQueueOutput($GLOBALS['I18N']->get('Started'), 0);
506    if (defined('SYSTEM_TIMEZONE')) {
507        processQueueOutput($GLOBALS['I18N']->get('Time now ').date('Y-m-d H:i'));
508    }
509}
510//processQueueOutput('Will process for a maximum of '.$maxProcessQueueTime.' seconds '.MAX_PROCESSQUEUE_TIME);
511
512//# ask plugins if processing is allowed at all
513foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
514    //  cl_output('Asking '.$pluginname);
515    if (!$plugin->allowProcessQueue()) {
516        processQueueOutput(s('Processing blocked by plugin %s', $pluginname));
517        finish('info', s('Processing blocked by plugin %s', $pluginname));
518        exit;
519    }
520}
521
522if (empty($reload)) { //# only show on first load
523    if (!empty($ISPrestrictions)) {
524        processQueueOutput($ISPrestrictions);
525    }
526    if (is_file($ISPlockfile)) {
527        ProcessError(s('Processing has been suspended by your ISP, please try again later'), 1);
528    }
529}
530
531if ($counters['num_per_batch'] > 0) {
532    if ($original_num_per_batch != $counters['num_per_batch']) {
533        if (empty($reload)) {
534            processQueueOutput(s('Sending in batches of %s messages', number_format($original_num_per_batch)), 0);
535        }
536        $diff = $original_num_per_batch - $counters['num_per_batch'];
537        if ($diff < 0) {
538            $diff = 0;
539        }
540        processQueueOutput(s('This batch will be %s emails, because in the last %s seconds %s emails were sent',
541            number_format($counters['num_per_batch']), number_format($batch_period), number_format($diff)), 0, 'progress');
542    } else {
543
544        processQueueOutput(s('Sending in batches of %s emails', number_format($counters['num_per_batch'])), 0, 'progress');
545    }
546} elseif ($counters['num_per_batch'] < 0) {
547    processQueueOutput(s('In the last %s seconds more emails were sent (%s) than is currently allowed per batch (%s)',
548        number_format($batch_period), number_format($recently_sent[0]), number_format($original_num_per_batch)), 0, 'progress');
549    $processed = 0;
550    $script_stage = 5;
551    $GLOBALS['wait'] = $batch_period;
552
553    return;
554}
555$counters['batch_total'] = $counters['num_per_batch'];
556$counters['failed_sent'] = 0;
557$counters['invalid'] = 0;
558$counters['sent'] = 0;
559
560if (0 && $reload) {
561    processQueueOutput(s('Sent in last run').": $lastsent", 0, 'progress');
562    processQueueOutput(s('Skipped in last run').": $lastskipped", 0, 'progress');
563}
564
565$script_stage = 1; // we are active
566$notsent = $unconfirmed = $cannotsend = 0;
567
568//# check for messages that need requeuing
569
570$req = Sql_Query(sprintf('select id from %s where requeueinterval > 0 and requeueuntil > now() and status = "sent"',
571    $tables['message']));
572
573while ($msg = Sql_Fetch_Assoc($req)) {
574    Sql_query(sprintf(
575        'UPDATE %s
576      SET status = "submitted",
577      sendstart = null,
578      embargo = embargo +
579        INTERVAL (FLOOR(TIMESTAMPDIFF(MINUTE, embargo, GREATEST(embargo, NOW())) / requeueinterval) + 1) * requeueinterval MINUTE
580      WHERE id = %d',
581        $GLOBALS['tables']['message'],
582        $msg['id']
583    ));
584
585    foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
586        $plugin->messageReQueued($msg['id']);
587    }
588    //# @@@ need to update message data as well
589}
590
591$messagelimit = '';
592//# limit the number of campaigns to work on
593if (defined('MAX_PROCESS_MESSAGE')) {
594    $messagelimit = sprintf(' limit %d ', MAX_PROCESS_MESSAGE);
595}
596
597$query = ' select id from '.$tables['message'].' where status not in ("draft", "sent", "prepared", "suspended") and embargo <= now() order by entered '.$messagelimit;
598if (VERBOSE) {
599    processQueueOutput($query);
600}
601$messages = Sql_query($query);
602$num_messages = Sql_Num_Rows($messages);
603if (Sql_Has_Error($database_connection)) {
604    ProcessError(Sql_Error($database_connection));
605}
606if ($num_messages) {
607    $counters['status'] = $num_messages;
608    if (empty($reload)) {
609        processQueueOutput($GLOBALS['I18N']->get('Processing has started,'));
610        if ($num_messages == 1) {
611            processQueueOutput(s('One campaign to process.'));
612        } else {
613            processQueueOutput(s('%d campaigns to process.', $num_messages));
614        }
615    }
616    clearPageCache();
617    if (!$GLOBALS['commandline'] && empty($reload)) {
618        processQueueOutput(s('Please leave this window open.').' '.s('phpList will process your queue until all messages have been sent.').' '.s('This may take a while'));
619        if (SEND_QUEUE_PROCESSING_REPORT) {
620            processQueueOutput(s('Report of processing will be sent by email'));
621        }
622    }
623} else {
624    //# check for a future embargo, to be able to report when it expires.
625    $future = Sql_Fetch_Assoc_Query('select unix_timestamp(embargo) - unix_timestamp(now()) as waittime '
626        ." from ${tables['message']}"
627        ." where status not in ('draft', 'sent', 'prepared', 'suspended')"
628        .' and embargo > now()'
629        .' order by embargo asc limit 1');
630    if ($future) {
631        $counters['status'] = 'embargo';
632        $counters['delaysend'] = $future['waittime'];
633    }
634}
635
636$script_stage = 2; // we know the messages to process
637//include_once "footer.inc";
638if (empty($counters['num_per_batch'])) {
639    $counters['num_per_batch'] = 10000000;
640}
641
642while ($message = Sql_fetch_array($messages)) {
643    ++$counters['campaign'];
644    $throttlecount = 0;
645
646    $messageid = $message['id'];
647    $counters['sent_users_for_message '.$messageid] = 0;
648    $counters['total_users_for_message '.$messageid] = 0;
649    if (PROCESSCAMPAIGNS_PARALLEL) {
650        $counters['max_users_for_message '.$messageid] = (int) $counters['num_per_batch'] / $num_messages; //# not entirely correct if a campaign has less left
651        if (VERBOSE) {
652            cl_output(s('Maximum for campaign %d is %d', $messageid, $counters['max_users_for_message '.$messageid]));
653        }
654    } else {
655        $counters['max_users_for_message '.$messageid] = 0;
656    }
657
658    $counters['processed_users_for_message '.$messageid] = 0;
659    $counters['failed_sent_for_message '.$messageid] = 0;
660
661    if (!empty($getspeedstats)) {
662        processQueueOutput('start send '.$messageid);
663    }
664
665    $msgdata = loadMessageData($messageid);
666    foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
667        $plugin->campaignStarted($msgdata);
668    }
669
670    if (!empty($msgdata['resetstats'])) {
671        resetMessageStatistics($msgdata['id']);
672        //# make sure to reset the resetstats flag, so it doesn't clear it every run
673        setMessageData($msgdata['id'], 'resetstats', 0);
674    }
675
676    //# check the end date of the campaign
677    $stopSending = false;
678    if (!empty($msgdata['finishsending'])) {
679        $finishSendingBefore = mktime($msgdata['finishsending']['hour'], $msgdata['finishsending']['minute'], 0,
680            $msgdata['finishsending']['month'], $msgdata['finishsending']['day'], $msgdata['finishsending']['year']);
681        $secondsTogo = $finishSendingBefore - time();
682        $stopSending = $secondsTogo < 0;
683        if (empty($reload)) {
684            //## Hmm, this is probably incredibly confusing. It won't finish then
685            if (VERBOSE) {
686                processQueueOutput(sprintf($GLOBALS['I18N']->get('sending of this campaign will stop, if it is still going in %s'),
687                    secs2time($secondsTogo)));
688            }
689        }
690    }
691
692    $userselection = $msgdata['userselection']; //# @@ needs more work
693    //# load message in cache
694    if (!precacheMessage($messageid)) {
695        //# precache may fail on eg invalid remote URL
696        //# any reporting needed here?
697
698        // mark the message as suspended
699        Sql_Query(sprintf('update %s set status = "suspended" where id = %d', $GLOBALS['tables']['message'],
700            $messageid));
701        processQueueOutput(s('Error loading message, please check the eventlog for details'));
702        if (MANUALLY_PROCESS_QUEUE) {
703            // wait a little, otherwise the message won't show
704            sleep(10);
705        }
706        continue;
707    }
708
709    if (!empty($getspeedstats)) {
710        processQueueOutput('message data loaded ');
711    }
712    if (VERBOSE) {
713        //   processQueueOutput($msgdata);
714    }
715    if (!empty($msgdata['notify_start']) && !isset($msgdata['start_notified'])) {
716        $notifications = explode(',', $msgdata['notify_start']);
717        foreach ($notifications as $notification) {
718            sendMail($notification, s('Campaign started'),
719                s('phplist has started sending the campaign with subject %s', $msgdata['subject'])."\n\n".
720                s('to view the progress of this campaign, go to %s://%s', $GLOBALS['admin_scheme'],
721                    hostName().$GLOBALS['adminpages'].'/?page=messages&amp;tab=active'));
722        }
723        Sql_Query(sprintf('insert ignore into %s (name,id,data) values("start_notified",%d,now())',
724            $GLOBALS['tables']['messagedata'], $messageid));
725    }
726
727    if (empty($reload)) {
728        processQueueOutput($GLOBALS['I18N']->get('Processing message').' '.$messageid);
729    }
730
731    flush();
732    keepLock($send_process_id);
733    $status = Sql_Query(sprintf('update %s set status = "inprocess" where id = %d', $tables['message'], $messageid));
734    $sendstart = Sql_Query(sprintf('update %s set sendstart = now() where sendstart is null and id = %d',
735        $tables['message'], $messageid));
736    if (empty($reload)) {
737        processQueueOutput($GLOBALS['I18N']->get('Looking for users'));
738    }
739    if (Sql_Has_Error($database_connection)) {
740        ProcessError(Sql_Error($database_connection));
741    }
742
743    // make selection on attribute, users who at least apply to the attributes
744    // lots of ppl seem to use it as a normal mailinglist system, and do not use attributes.
745    // Check this and take anyone in that case.
746
747    //# keep an eye on how long it takes to find users, and warn if it's a long time
748    $findUserStart = $processqueue_timer->elapsed(1);
749
750    $rs = Sql_Query('select count(*) from '.$tables['attribute']);
751    $numattr = Sql_Fetch_Row($rs);
752
753    $user_attribute_query = ''; //16552
754    if ($userselection && $numattr[0]) {
755        $res = Sql_Query($userselection);
756        $counters['total_users_for_message'] = Sql_Num_Rows($res);
757        if (empty($reload)) {
758            processQueueOutput($counters['total_users_for_message'].' '.$GLOBALS['I18N']->get('users apply for attributes, now checking lists'),
759                0, 'progress');
760        }
761        $user_list = '';
762        while ($row = Sql_Fetch_row($res)) {
763            $user_list .= $row[0].',';
764        }
765        $user_list = substr($user_list, 0, -1);
766        if ($user_list) {
767            $user_attribute_query = " and listuser.userid in ($user_list)";
768        } else {
769            if (empty($reload)) {
770                processQueueOutput($GLOBALS['I18N']->get('No users apply for attributes'));
771            }
772            $status = Sql_Query(sprintf('update %s set status = "sent", sent = now() where id = %d', $tables['message'],
773                $messageid));
774            finish('info', "Message $messageid: \nNo users apply for attributes, ie nothing to do");
775            $script_stage = 6;
776            // we should actually continue with the next message
777            return;
778        }
779    }
780    if ($script_stage < 3) {
781        $script_stage = 3; // we know the users by attribute
782    }
783
784    // when using commandline we need to exclude users who have already received
785    // the email
786    // we don't do this otherwise because it slows down the process, possibly
787    // causing us to not find anything at all
788    $exclusion = '';
789    $doneusers = array();
790    $skipusers = array();
791
792//# 8478, avoid building large array in memory, when sending large amounts of users.
793
794    /*
795      $req = Sql_Query("select userid from {$tables["usermessage"]} where messageid = $messageid");
796      $skipped = Sql_Affected_Rows();
797      if ($skipped < 10000) {
798        while ($row = Sql_Fetch_Row($req)) {
799          $alive = checkLock($send_process_id);
800          if ($alive)
801            keepLock($send_process_id);
802          else
803            ProcessError($GLOBALS['I18N']->get('Process Killed by other process'));
804          array_push($doneusers,$row[0]);
805        }
806      } else {
807        processQueueOutput($GLOBALS['I18N']->get('Warning, disabling exclusion of done users, too many found'));
808        logEvent($GLOBALS['I18N']->get('Warning, disabling exclusion of done users, too many found'));
809      }
810
811      # also exclude unconfirmed users, otherwise they'll block the process
812      # will give quite different statistics than when used web based
813    #  $req = Sql_Query("select id from {$tables["user"]} where !confirmed");
814    #  while ($row = Sql_Fetch_Row($req)) {
815    #    array_push($doneusers,$row[0]);
816    #  }
817      if (sizeof($doneusers))
818        $exclusion = " and listuser.userid not in (".join(",",$doneusers).")";
819    */
820
821    if (USE_LIST_EXCLUDE) {
822        if (VERBOSE) {
823            processQueueOutput($GLOBALS['I18N']->get('looking for users who can be excluded from this mailing'));
824        }
825        if (count($msgdata['excludelist'])) {
826            $query
827                = ' select userid'
828                .' from '.$GLOBALS['tables']['listuser']
829                .' where listid in ('.implode(',', $msgdata['excludelist']).')';
830            if (VERBOSE) {
831                processQueueOutput('Exclude query '.$query);
832            }
833            $req = Sql_Query($query);
834            while ($row = Sql_Fetch_Row($req)) {
835                $um = Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"excluded")',
836                    $tables['usermessage'], $row[0], $messageid));
837            }
838        }
839    }
840
841    /*
842      ## 8478
843      $query = sprintf('select distinct user.id from
844        %s as listuser,
845        %s as user,
846        %s as listmessage
847        where
848        listmessage.messageid = %d and
849        listmessage.listid = listuser.listid and
850        user.id = listuser.userid %s %s %s',
851        $tables['listuser'],$tables["user"],$tables['listmessage'],
852        $messageid,
853        $userconfirmed,
854        $exclusion,
855        $user_attribute_query);*/
856    $queued = 0;
857    if (defined('MESSAGEQUEUE_PREPARE') && MESSAGEQUEUE_PREPARE) {
858        $query = sprintf('select userid from '.$tables['usermessage'].' where messageid = %d and status = "todo"',
859            $messageid);
860        $queued_count = Sql_Query($query);
861        $queued = Sql_Affected_Rows();
862        // if (VERBOSE) {
863        cl_output('found pre-queued subscribers '.$queued, 0, 'progress');
864        //  }
865    }
866
867    //# if the above didn't find any, run the normal search (again)
868    if (empty($queued)) {
869        //# remove pre-queued messages, otherwise they wouldn't go out
870        Sql_Query(sprintf('delete from '.$tables['usermessage'].' where messageid = %d and status = "todo"',
871            $messageid));
872        $removed = Sql_Affected_Rows();
873        if ($removed) {
874            cl_output('removed pre-queued subscribers '.$removed, 0, 'progress');
875        }
876
877        $query = sprintf('select distinct u.id from %s as listuser
878        inner join %s as u ON u.id = listuser.userid
879        inner join %s as listmessage ON listuser.listid = listmessage.listid
880        left join %s as um ON (um.messageid = %d and um.userid = listuser.userid)
881        where
882        listmessage.messageid = %d
883        and listmessage.listid = listuser.listid
884        and u.id = listuser.userid
885        and um.userid IS NULL
886        and u.confirmed and !u.blacklisted and !u.disabled
887        %s %s',
888            $tables['listuser'],
889            $tables['user'],
890            $tables['listmessage'],
891            $tables['usermessage'],
892            $messageid, $messageid,
893            $exclusion, $user_attribute_query
894        );
895    }
896
897    if (VERBOSE) {
898        processQueueOutput('User select query '.$query);
899    }
900
901    $userids = Sql_Query($query);
902    if (Sql_Has_Error($database_connection)) {
903        ProcessError(Sql_Error($database_connection));
904    }
905
906    // now we have all our users to send the message to
907    $counters['total_users_for_message '.$messageid] = Sql_Affected_Rows();
908
909    if ($skipped >= 10000) {
910        $counters['total_users_for_message '.$messageid] -= $skipped;
911    }
912
913    $findUserEnd = $processqueue_timer->elapsed(1);
914
915    if ($findUserEnd - $findUserStart > 300 && !$GLOBALS['commandline']) {
916        processQueueOutput($GLOBALS['I18N']->get('Warning, finding the subscribers to send out to takes a long time, consider changing to commandline sending'));
917    }
918
919    if (empty($reload)) {
920        processQueueOutput($GLOBALS['I18N']->get('Found them').': '.$counters['total_users_for_message '.$messageid].' '.$GLOBALS['I18N']->get('to process'));
921    }
922    setMessageData($messageid, 'to process', $counters['total_users_for_message '.$messageid]);
923
924    if (defined('MESSAGEQUEUE_PREPARE') && MESSAGEQUEUE_PREPARE && empty($queued)) {
925        //# experimental MESSAGEQUEUE_PREPARE will first mark all messages as todo and then work it's way through the todo's
926        //# that should save time when running the queue multiple times, which avoids the user search after the first time
927        //# only do this first time, ie empty($queued);
928        //# the last run will pick up changes
929        while ($userdata = Sql_Fetch_Row($userids)) {
930            //# mark message/user combination as "todo"
931            $userid = $userdata[0];    // id of the user
932            Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"todo")',
933                $tables['usermessage'], $userid, $messageid));
934        }
935        //# rerun the initial query, in order to continue as normal
936        $query = sprintf('select userid from '.$tables['usermessage'].' where messageid = %d and status = "todo"',
937            $messageid);
938        $userids = Sql_Query($query);
939        $counters['total_users_for_message '.$messageid] = Sql_Affected_Rows();
940    }
941
942    while ($userdata = Sql_Fetch_Row($userids)) {
943        $userid = $userdata[0];    // id of the user
944
945        /*
946         * when parallel processing stop when the number sent for this message reaches the limit
947         */
948        if ($counters['max_users_for_message '.$messageid]
949            && $counters['sent_users_for_message '.$messageid] >= $counters['max_users_for_message '.$messageid]
950        ) {
951            if (VERBOSE) {
952                cl_output(s('Limit for this campaign reached: %d (%d)',
953                    $counters['sent_users_for_message '.$messageid],
954                    $counters['max_users_for_message '.$messageid]));
955            }
956            break;
957        }
958        /*
959         * when batch processing stop when the number sent reaches the batch limit
960         */
961        if ($counters['num_per_batch'] && $counters['sent'] >= $counters['num_per_batch']) {
962            processQueueOutput(s('batch limit reached').': '.$counters['sent'].' ('.$counters['num_per_batch'].')',
963                1, 'progress');
964            $GLOBALS['wait'] = $batch_period;
965
966            return;
967        }
968        $failure_reason = '';
969
970        if (!empty($getspeedstats)) {
971            processQueueOutput('-----------------------------------'."\n".'start process user '.$userid);
972        }
973        $some = 1;
974        set_time_limit(120);
975
976        $secondsTogo = $finishSendingBefore - time();
977        $stopSending = $secondsTogo < 0;
978
979        // check if we have been "killed"
980        //   processQueueOutput('Process ID '.$send_process_id);
981        $alive = checkLock($send_process_id);
982
983        //# check for max-process-queue-time
984        $elapsed = $GLOBALS['processqueue_timer']->elapsed(1);
985        if ($maxProcessQueueTime && $elapsed > $maxProcessQueueTime && $counters['sent'] > 0) {
986            cl_output($GLOBALS['I18N']->get('queue processing time has exceeded max processing time ').$maxProcessQueueTime);
987            break;
988        } elseif ($alive && !$stopSending) {
989            keepLock($send_process_id);
990        } elseif ($stopSending) {
991            processQueueOutput($GLOBALS['I18N']->get('Campaign sending timed out, is past date to process until'));
992            break;
993        } else {
994            ProcessError($GLOBALS['I18N']->get('Process Killed by other process'));
995        }
996
997        // check if the message we are working on is still there and in process
998        $status = Sql_Fetch_Array_query("select id,status from {$tables['message']} where id = $messageid");
999        if (!$status['id']) {
1000            ProcessError($GLOBALS['I18N']->get('Message I was working on has disappeared'));
1001        } elseif ($status['status'] != 'inprocess') {
1002            $script_stage = 6;
1003            ProcessError($GLOBALS['I18N']->get('Sending of this message has been suspended'));
1004        }
1005        flush();
1006
1007        //#
1008        //Sql_Query(sprintf('delete from %s where userid = %d and messageid = %d and status = "active"',$tables['usermessage'],$userid,$messageid));
1009
1010        // check whether the user has already received the message
1011        if (!empty($getspeedstats)) {
1012            processQueueOutput('verify message can go out to '.$userid);
1013        }
1014
1015        $um = Sql_Query(sprintf('select entered from %s where userid = %d and messageid = %d and status != "todo"',
1016            $tables['usermessage'], $userid, $messageid));
1017        if (!Sql_Num_Rows($um)) {
1018            //# mark this message that we're working on it, so that no other process will take it
1019            //# between two lines ago and here, should hopefully be quick enough
1020            $userlock = Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"active")',
1021                $tables['usermessage'], $userid, $messageid));
1022
1023            if ($script_stage < 4) {
1024                $script_stage = 4; // we know a subscriber to send to
1025            }
1026            $someusers = 1;
1027            $users = Sql_query("select id,email,uniqid,htmlemail,confirmed,blacklisted,disabled from {$tables['user']} where id = $userid");
1028
1029            // pick the first one (rather historical from before email was unique)
1030            $user = Sql_fetch_Assoc($users);
1031            if ($user['confirmed'] && is_email($user['email'])) {
1032                $userid = $user['id'];    // id of the subscriber
1033                $useremail = $user['email']; // email of the subscriber
1034                $userhash = $user['uniqid'];  // unique string of the user
1035                $htmlpref = $user['htmlemail'];  // preference for HTML emails
1036                $confirmed = $user['confirmed'] && !$user['disabled']; //# 7 = disabled flag
1037                $blacklisted = $user['blacklisted'];
1038                $msgdata['counters'] = $counters;
1039
1040                $cansend = !$blacklisted && $confirmed;
1041                /*
1042                ## Ask plugins if they are ok with sending this message to this user
1043                */
1044                if (!empty($getspeedstats)) {
1045                    processQueueOutput('start check plugins ');
1046                }
1047
1048                reset($GLOBALS['plugins']);
1049                while ($cansend && $plugin = current($GLOBALS['plugins'])) {
1050                    $cansend = $plugin->canSend($msgdata, $user);
1051                    if (!$cansend) {
1052                        $failure_reason .= 'Sending blocked by plugin '.$plugin->name;
1053                        $counterIndex = 'send blocked by '.$plugin->name;
1054                        if (!isset($counters[$counterIndex])) {
1055                            $counters[$counterIndex] = 0;
1056                        }
1057                        ++$counters[$counterIndex];
1058                        if (VERBOSE) {
1059                            cl_output('Sending blocked by plugin '.$plugin->name);
1060                        }
1061                    }
1062
1063                    next($GLOBALS['plugins']);
1064                }
1065                if (!empty($getspeedstats)) {
1066                    processQueueOutput('end check plugins ');
1067                }
1068
1069//###################################
1070// Throttling
1071
1072                $throttled = 0;
1073                if ($cansend && USE_DOMAIN_THROTTLE) {
1074                    list($mailbox, $throttleDomain) = explode('@', $useremail);
1075                    foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
1076                        if ($newThrottleDomain = $plugin->throttleDomainMap($throttleDomain)) {
1077                            $throttleDomain = $newThrottleDomain;
1078                            break;
1079                        }
1080                    }
1081                    $now = time();
1082                    $interval = $now - ($now % DOMAIN_BATCH_PERIOD);
1083                    if (!isset($domainthrottle[$throttleDomain]) || !is_array($domainthrottle[$throttleDomain])) {
1084                        $domainthrottle[$throttleDomain] = array(
1085                            'interval'  => '',
1086                            'sent'      => 0,
1087                            'attempted' => 0,
1088                        );
1089                    } elseif (isset($domainthrottle[$throttleDomain]['interval']) && $domainthrottle[$throttleDomain]['interval'] == $interval) {
1090                        $throttled = $domainthrottle[$throttleDomain]['sent'] >= DOMAIN_BATCH_SIZE;
1091                        if ($throttled) {
1092                            ++$counters['send blocked by domain throttle'];
1093                            ++$domainthrottle[$throttleDomain]['attempted'];
1094                            if (DOMAIN_AUTO_THROTTLE
1095                                && $domainthrottle[$throttleDomain]['attempted'] > 25 // skip a few before auto throttling
1096                                && $num_messages <= 1 // only do this when there's only one message to process otherwise the other ones don't get a chance
1097                                && $counters['total_users_for_message '.$messageid] < 1000 // and also when there's not too many left, because then it's likely they're all being throttled
1098                            ) {
1099                                $domainthrottle[$throttleDomain]['attempted'] = 0;
1100                                logEvent(sprintf($GLOBALS['I18N']->get('There have been more than 10 attempts to send to %s that have been blocked for domain throttling.'),
1101                                    $throttleDomain));
1102                                logEvent($GLOBALS['I18N']->get('Introducing extra delay to decrease throttle failures'));
1103                                if (VERBOSE) {
1104                                    processQueueOutput($GLOBALS['I18N']->get('Introducing extra delay to decrease throttle failures'));
1105                                }
1106                                if (!isset($running_throttle_delay)) {
1107                                    $running_throttle_delay = (int) (MAILQUEUE_THROTTLE + (DOMAIN_BATCH_PERIOD / (DOMAIN_BATCH_SIZE * 4)));
1108                                } else {
1109                                    $running_throttle_delay += (int) (DOMAIN_BATCH_PERIOD / (DOMAIN_BATCH_SIZE * 4));
1110                                }
1111                                //processQueueOutput("Running throttle delay: ".$running_throttle_delay);
1112                            } elseif (VERBOSE) {
1113                                processQueueOutput(sprintf($GLOBALS['I18N']->get('%s is currently over throttle limit of %d per %d seconds').' ('.$domainthrottle[$throttleDomain]['sent'].')',
1114                                    $throttleDomain, DOMAIN_BATCH_SIZE, DOMAIN_BATCH_PERIOD));
1115                            }
1116                        }
1117                    }
1118                }
1119
1120                if ($cansend) {
1121                    $success = 0;
1122                    if (!TEST) {
1123                        reset($GLOBALS['plugins']);
1124                        while (!$throttled && $plugin = current($GLOBALS['plugins'])) {
1125                            $throttled = $plugin->throttleSend($msgdata, $user);
1126                            if ($throttled) {
1127                                if (!isset($counters['send throttled by plugin '.$plugin->name])) {
1128                                    $counters['send throttled by plugin '.$plugin->name] = 0;
1129                                }
1130                                ++$counters['send throttled by plugin '.$plugin->name];
1131                                $failure_reason .= 'Sending throttled by plugin '.$plugin->name;
1132                            }
1133                            next($GLOBALS['plugins']);
1134                        }
1135                        if (!$throttled) {
1136                            if (VERBOSE) {
1137                                processQueueOutput($GLOBALS['I18N']->get('Sending').' '.$messageid.' '.$GLOBALS['I18N']->get('to').' '.$useremail);
1138                            }
1139                            $emailSentTimer = new timer();
1140                            ++$counters['batch_count'];
1141                            $success = sendEmail($messageid, $useremail, $userhash,
1142                                $htmlpref); // $rssitems Obsolete by rssmanager plugin
1143                            if (!$success) {
1144                                ++$counters['sendemail returned false total'];
1145                                ++$counters['sendemail returned false'];
1146                            } else {
1147                                $counters['sendemail returned false'] = 0;
1148                            }
1149                            if ($counters['sendemail returned false'] > 10) {
1150                                foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
1151                                    $plugin->processError(s('Warning: a lot of errors while sending campaign %d',
1152                                        $messageid));
1153                                }
1154                            }
1155
1156                            if (VERBOSE) {
1157                                processQueueOutput($GLOBALS['I18N']->get('It took').' '.$emailSentTimer->elapsed(1).' '.$GLOBALS['I18N']->get('seconds to send'));
1158                            }
1159                        } else {
1160                            ++$throttlecount;
1161                        }
1162                    } else {
1163                        $success = sendEmailTest($messageid, $useremail);
1164                        ++$counters['sentastest'];
1165                        ++$counters['batch_count'];
1166                        setMessageData($messageid, 'sentastest', $counters['sentastest']);
1167                    }
1168
1169                    //############################
1170                    // tried to send email , process succes / failure
1171                    if ($success) {
1172                        if (USE_DOMAIN_THROTTLE) {
1173                            if ($domainthrottle[$throttleDomain]['interval'] != $interval) {
1174                                $domainthrottle[$throttleDomain]['interval'] = $interval;
1175                                $domainthrottle[$throttleDomain]['sent'] = 1;
1176                            } else {
1177                                ++$domainthrottle[$throttleDomain]['sent'];
1178                            }
1179                        }
1180                        ++$counters['sent'];
1181                        ++$counters['sent_users_for_message '.$messageid];
1182                        $um = Sql_Query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"sent")',
1183                            $tables['usermessage'], $userid, $messageid));
1184                    } else {
1185                        ++$counters['failed_sent'];
1186                        ++$counters['failed_sent_for_message '.$messageid];
1187                        //# need to check this, the entry shouldn't be there in the first place, so no need to delete it
1188                        //# might be a cause for duplicated emails
1189                        if (defined('MESSAGEQUEUE_PREPARE') && MESSAGEQUEUE_PREPARE) {
1190                            Sql_Query(sprintf('update %s set status = "todo" where userid = %d and messageid = %d and status = "active"',
1191                                $tables['usermessage'], $userid, $messageid));
1192                        } else {
1193                            Sql_Query(sprintf('delete from %s where userid = %d and messageid = %d and status = "active"',
1194                                $tables['usermessage'], $userid, $messageid));
1195                        }
1196                        if (VERBOSE) {
1197                            processQueueOutput($GLOBALS['I18N']->get('Failed sending to').' '.$useremail);
1198                            logEvent("Failed sending message $messageid to $useremail");
1199                        }
1200                        // make sure it's not because it's an underdeliverable email
1201                        // unconfirm this user, so they're not included next time
1202                        if (!$throttled && !validateEmail($useremail)) {
1203                            ++$unconfirmed;
1204                            ++$counters['email address invalidated'];
1205                            logEvent("invalid email address $useremail user marked unconfirmed");
1206                            Sql_Query(sprintf('update %s set confirmed = 0 where email = "%s"',
1207                                $GLOBALS['tables']['user'], $useremail));
1208                        }
1209                    }
1210
1211                    if ($script_stage < 5) {
1212                        $script_stage = 5; // we have actually sent one user
1213                    }
1214                    if (isset($running_throttle_delay)) {
1215                        sleep($running_throttle_delay);
1216                        if ($counters['sent'] % 5 == 0) {
1217                            // retry running faster after some more messages, to see if that helps
1218                            unset($running_throttle_delay);
1219                        }
1220                    } elseif (MAILQUEUE_THROTTLE) {
1221                        usleep(MAILQUEUE_THROTTLE * 1000000);
1222                    } elseif (MAILQUEUE_BATCH_SIZE && MAILQUEUE_AUTOTHROTTLE) {
1223                        $totaltime = $GLOBALS['processqueue_timer']->elapsed(1);
1224                        $msgperhour = (3600 / $totaltime) * $counters['sent'];
1225                        $msgpersec = $msgperhour / 3600;
1226
1227                        //#11336 - this may cause "division by 0", but 'secpermsg' isn't used at all
1228                        //  $secpermsg = $totaltime / $counters['sent'];
1229                        $target = (MAILQUEUE_BATCH_PERIOD / MAILQUEUE_BATCH_SIZE) * $counters['sent'];
1230                        $delay = $target - $totaltime;
1231
1232                        if ($delay > 0) {
1233                            if (VERBOSE) {
1234                                /* processQueueOutput($GLOBALS['I18N']->get('waiting for').' '.$delay.' '.$GLOBALS['I18N']->get('seconds').' '.
1235                               $GLOBALS['I18N']->get('to make sure we don\'t exceed our limit of ').MAILQUEUE_BATCH_SIZE.' '.
1236                               $GLOBALS['I18N']->get('messages in ').' '.MAILQUEUE_BATCH_PERIOD.$GLOBALS['I18N']->get('seconds')); */
1237                                processQueueOutput(sprintf($GLOBALS['I18N']->get('waiting for %.1f seconds to meet target of %s seconds per message'),
1238                                        $delay, (MAILQUEUE_BATCH_PERIOD / MAILQUEUE_BATCH_SIZE))
1239                                );
1240                            }
1241                            usleep($delay * 1000000);
1242                        }
1243                    }
1244                } else {
1245                    ++$cannotsend;
1246                    // mark it as sent anyway, because otherwise the process will never finish
1247                    if (VERBOSE) {
1248                        processQueueOutput($GLOBALS['I18N']->get('not sending to ').$useremail);
1249                    }
1250                    $um = Sql_query("replace into {$tables['usermessage']} (entered,userid,messageid,status) values(now(),$userid,$messageid,\"not sent\")");
1251                }
1252
1253                // update possible other users matching this email as well,
1254                // to avoid duplicate sending when people have subscribed multiple times
1255                // bit of legacy code after making email unique in the database
1256                //        $emails = Sql_query("select * from {$tables['user']} where email =\"$useremail\"");
1257                //        while ($email = Sql_fetch_row($emails))
1258                //          Sql_query("replace into {$tables['usermessage']} (userid,messageid) values($email[0],$messageid)");
1259            } else {
1260                // some "invalid emails" are entirely empty, ah, that is because they are unconfirmed
1261
1262                //# this is quite old as well, with the preselection that avoids unconfirmed users
1263                // it is unlikely this is every processed.
1264
1265                if (!$user['confirmed'] || $user['disabled']) {
1266                    if (VERBOSE) {
1267                        processQueueOutput($GLOBALS['I18N']->get('Unconfirmed user').': '.$userid.' '.$user['email'].' '.$user['id']);
1268                    }
1269                    ++$unconfirmed;
1270                    // when running from commandline we mark it as sent, otherwise we might get
1271                    // stuck when using batch processing
1272                    // if ($GLOBALS["commandline"]) {
1273                    $um = Sql_query("replace into {$tables['usermessage']} (entered,userid,messageid,status) values(now(),$userid,$messageid,\"unconfirmed user\")");
1274                    // }
1275                } elseif ($user['email'] || $user['id']) {
1276                    if (VERBOSE) {
1277                        processQueueOutput(s('Invalid email address').': '.$user['email'].' '.$user['id']);
1278                    }
1279                    logEvent(s('Invalid email address').': userid  '.$user['id'].'  email '.$user['email']);
1280                    // mark it as sent anyway
1281                    if ($user['id']) {
1282                        $um = Sql_query(sprintf('replace into %s (entered,userid,messageid,status) values(now(),%d,%d,"invalid email address")',
1283                            $tables['usermessage'], $userid, $messageid));
1284                        Sql_Query(sprintf('update %s set confirmed = 0 where id = %d',
1285                            $GLOBALS['tables']['user'], $user['id']));
1286                        addUserHistory(
1287                            $user['email'],
1288                            s('Subscriber marked unconfirmed for invalid email address'),
1289                            s('Marked unconfirmed while sending campaign %d', $messageid)
1290                        );
1291                    }
1292                    ++$counters['invalid'];
1293                }
1294            }
1295        } else {
1296
1297            //# and this is quite historical, and also unlikely to be every called
1298            // because we now exclude users who have received the message from the
1299            // query to find users to send to
1300
1301            //# when trying to send the message, it was already marked for this user
1302            //# June 2010, with the multiple send process extension, that's quite possible to happen again
1303
1304            $um = Sql_Fetch_Row($um);
1305            ++$notsent;
1306            if (VERBOSE) {
1307                processQueueOutput($GLOBALS['I18N']->get('Not sending to').' '.$userid.', '.$GLOBALS['I18N']->get('already sent').' '.$um[0]);
1308            }
1309        }
1310        $status = Sql_query("update {$tables['message']} set processed = processed + 1 % 16000000 where id = $messageid");
1311        $processed = $notsent + $counters['sent'] + $counters['invalid'] + $unconfirmed + $cannotsend + $counters['failed_sent'];
1312        //if ($processed % 10 == 0) {
1313        if (0) {
1314            processQueueOutput('AR'.$affrows.' N '.$counters['total_users_for_message '.$messageid].' P'.$processed.' S'.$counters['sent'].' N'.$notsent.' I'.$counters['invalid'].' U'.$unconfirmed.' C'.$cannotsend.' F'.$counters['failed_sent']);
1315            $rn = $reload * $counters['num_per_batch'];
1316            processQueueOutput('P '.$processed.' N'.$counters['total_users_for_message '.$messageid].' NB'.$counters['num_per_batch'].' BT'.$batch_total.' R'.$reload.' RN'.$rn);
1317        }
1318        /*
1319         * don't calculate this here, but in the "msgstatus" instead, so that
1320         * the total speed can be calculated, eg when there are multiple send processes
1321         *
1322         * re-added for commandline outputting
1323         */
1324
1325        $totaltime = $GLOBALS['processqueue_timer']->elapsed(1);
1326        if ($counters['sent'] > 0) {
1327            $msgperhour = (3600 / $totaltime) * $counters['sent'];
1328            $secpermsg = $totaltime / $counters['sent'];
1329            $timeleft = ($counters['total_users_for_message '.$messageid] - $counters['sent']) * $secpermsg;
1330            $eta = date('D j M H:i', time() + $timeleft);
1331        } else {
1332            $msgperhour = 0;
1333            $secpermsg = 0;
1334            $timeleft = 0;
1335            $eta = $GLOBALS['I18N']->get('unknown');
1336        }
1337        ++$counters['processed_users_for_message '.$messageid];
1338        setMessageData($messageid, 'ETA', $eta);
1339        setMessageData($messageid, 'msg/hr', "$msgperhour");
1340
1341        cl_progress('sent '.$counters['sent'].' ETA '.$eta.' sending '.sprintf('%d',
1342                $msgperhour).' msg/hr');
1343
1344        setMessageData($messageid, 'to process',
1345            $counters['total_users_for_message '.$messageid] - $counters['processed_users_for_message '.$messageid]);
1346        setMessageData($messageid, 'last msg sent', time());
1347        //  setMessageData($messageid,'totaltime',$GLOBALS['processqueue_timer']->elapsed(1));
1348        if (!empty($getspeedstats)) {
1349            processQueueOutput('end process user '."\n".'-----------------------------------'."\n".$userid);
1350        }
1351    }
1352    $processed = $notsent + $counters['sent'] + $counters['invalid'] + $unconfirmed + $cannotsend + $counters['failed_sent'];
1353    processQueueOutput(s('Processed %d out of %d subscribers', $counters['processed_users_for_message '.$messageid],
1354        $counters['total_users_for_message '.$messageid]), 1, 'progress');
1355
1356    if ($counters['total_users_for_message '.$messageid] - $counters['processed_users_for_message '.$messageid] <= 0 || $stopSending) {
1357        // this message is done
1358        if (!$someusers) {
1359            processQueueOutput($GLOBALS['I18N']->get('Hmmm, No users found to send to'), 1, 'progress');
1360        }
1361        if (!$counters['failed_sent']) {
1362            repeatMessage($messageid);
1363            foreach ($GLOBALS['plugins'] as $pluginname => $plugin) {
1364                $plugin->processSendingCampaignFinished($messageid, $msgdata);
1365            }
1366            $status = Sql_query(sprintf('update %s set status = "sent",sent = now() where id = %d',
1367                $GLOBALS['tables']['message'], $messageid));
1368
1369            if (!empty($msgdata['notify_end']) && !isset($msgdata['end_notified'])) {
1370                $notifications = explode(',', $msgdata['notify_end']);
1371                foreach ($notifications as $notification) {
1372                    sendMail($notification, $GLOBALS['I18N']->get('Message campaign finished'),
1373                        s('phpList has finished sending the campaign with subject %s', $msgdata['subject'])."\n\n".
1374                        s('to view the statistics of this campaign, go to %s://%s', $GLOBALS['admin_scheme'],
1375                            getConfig('website').$GLOBALS['adminpages'].'/?page=statsoverview&id='.$messageid)
1376                    );
1377                }
1378                Sql_Query(sprintf('insert ignore into %s (name,id,data) values("end_notified",%d,now())',
1379                    $GLOBALS['tables']['messagedata'], $messageid));
1380            }
1381            $rs = Sql_Query(sprintf('select sent, sendstart from %s where id = %d', $tables['message'], $messageid));
1382            $timetaken = Sql_Fetch_Row($rs);
1383            processQueueOutput($GLOBALS['I18N']->get('It took').' '.timeDiff($timetaken[0],
1384                    $timetaken[1]).' '.$GLOBALS['I18N']->get('to send this message'));
1385            sendMessageStats($messageid);
1386        }
1387        //# flush cached message track stats to the DB
1388        if (isset($GLOBALS['cached']['linktracksent'])) {
1389            flushClicktrackCache();
1390            // we're done with $messageid, so get rid of the cache
1391            unset($GLOBALS['cached']['linktracksent'][$messageid]);
1392        }
1393    } else {
1394        if ($script_stage < 5) {
1395            $script_stage = 5;
1396        }
1397    }
1398}
1399
1400if (!$num_messages) {
1401    $script_stage = 6;
1402} // we are done
1403# shutdown will take care of reporting
1404