1<?php
2/**
3 * Matomo - free/libre analytics platform
4 *
5 * @link https://matomo.org
6 * @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
7 *
8 */
9namespace Piwik\CronArchive;
10
11use Exception;
12use Piwik\CliMulti\Process;
13use Piwik\Log;
14use Piwik\Option;
15
16/**
17 * This class saves all to be processed siteIds in an Option named 'SharedSiteIdsToArchive' and processes all sites
18 * within that list. If a user starts multiple archiver those archiver will help to finish processing that list.
19 */
20class SharedSiteIds
21{
22    const OPTION_DEFAULT = 'SharedSiteIdsToArchive';
23    const OPTION_ALL_WEBSITES = 'SharedSiteIdsToArchive_AllWebsites';
24    const KEY_TIMESTAMP = '_ResetQueueTime';
25
26    /**
27     * @var string
28     */
29    private $optionName;
30
31    private $siteIds = array();
32    private $currentSiteId;
33    private $done = false;
34    private $initialResetQueueTime = null;
35    private $isContinuingPreviousRun = false;
36
37    public function __construct($websiteIds, $optionName = self::OPTION_DEFAULT)
38    {
39        $this->optionName = $optionName;
40
41        if (empty($websiteIds)) {
42            $websiteIds = array();
43        }
44
45        $self = $this;
46        $this->siteIds = $this->runExclusive(function () use ($self, $websiteIds) {
47            // if there are already sites to be archived registered, prefer the list of existing archive, meaning help
48            // to finish this queue of sites instead of starting a new queue
49            $existingWebsiteIds = $self->getAllSiteIdsToArchive();
50
51            if (!empty($existingWebsiteIds)) {
52                $this->isContinuingPreviousRun = true;
53                return $existingWebsiteIds;
54            }
55
56            $self->setQueueWasReset();
57            $self->setSiteIdsToArchive($websiteIds);
58
59            return $websiteIds;
60        });
61
62        $this->initialResetQueueTime = $this->getResetQueueTime();
63    }
64
65    public function setQueueWasReset()
66    {
67        Option::set($this->optionName . self::KEY_TIMESTAMP, floor(microtime(true) * 1000));
68    }
69
70    private function getResetQueueTime()
71    {
72        Option::clearCachedOption($this->optionName . self::KEY_TIMESTAMP);
73        return (int) Option::get($this->optionName . self::KEY_TIMESTAMP);
74    }
75
76    public function getInitialSiteIds()
77    {
78        return $this->siteIds;
79    }
80
81    /**
82     * Get the number of total websites that needs to be processed.
83     *
84     * @return int
85     */
86    public function getNumSites()
87    {
88        return count($this->siteIds);
89    }
90
91    /**
92     * Get the number of already processed websites (not necessarily all of those where processed by this archiver).
93     *
94     * @return int
95     */
96    public function getNumProcessedWebsites()
97    {
98        if ($this->done) {
99            return $this->getNumSites();
100        }
101
102        if (empty($this->currentSiteId)) {
103            return 0;
104        }
105
106        $index = array_search($this->currentSiteId, $this->siteIds);
107
108        if (false === $index) {
109            return 0;
110        }
111
112        return $index + 1;
113    }
114
115    public function setSiteIdsToArchive($siteIds)
116    {
117        if (!empty($siteIds)) {
118            Option::set($this->optionName, implode(',', $siteIds));
119        } else {
120            Option::delete($this->optionName);
121        }
122    }
123
124    public function getAllSiteIdsToArchive()
125    {
126        Option::clearCachedOption($this->optionName);
127        $siteIdsToArchive = Option::get($this->optionName);
128
129        if (empty($siteIdsToArchive)) {
130            return array();
131        }
132
133        return explode(',', trim($siteIdsToArchive));
134    }
135
136    /**
137     * If there are multiple archiver running on the same node it makes sure only one of them performs an action and it
138     * will wait until another one has finished. Any closure you pass here should be very fast as other processes wait
139     * for this closure to finish otherwise. Currently only used for making multiple archivers at the same time work.
140     * If a closure takes more than 5 seconds we assume it is dead and simply continue.
141     *
142     * @param \Closure $closure
143     * @return mixed
144     * @throws \Exception
145     */
146    private function runExclusive($closure)
147    {
148        $process = new Process('archive.sharedsiteids');
149
150        while ($process->isRunning() && $process->getSecondsSinceCreation() < 5) {
151            // wait max 5 seconds, such an operation should not take longer
152            usleep(25 * 1000);
153        }
154
155        $process->startProcess();
156
157        try {
158            $result = $closure();
159        } catch (Exception $e) {
160            $process->finishProcess();
161            throw $e;
162        }
163
164        $process->finishProcess();
165
166        return $result;
167    }
168
169    /**
170     * Get the next site id that needs to be processed or null if all site ids where processed.
171     *
172     * @return int|null
173     */
174    public function getNextSiteId()
175    {
176        if ($this->done) {
177            // we make sure we don't check again whether there are more sites to be archived as the list of
178            // sharedSiteIds may have been reset by now.
179            return null;
180        }
181
182        if ($this->initialResetQueueTime !== $this->getResetQueueTime()) {
183            // queue was reset/finished by some other process
184            $this->currentSiteId = null;
185            $this->done = true;
186            Log::debug('The shared site ID queue was reset, stopping.');
187            return null;
188        }
189
190        $self = $this;
191
192        $this->currentSiteId = $this->runExclusive(function () use ($self) {
193
194            $siteIds = $self->getAllSiteIdsToArchive();
195
196            if (empty($siteIds)) {
197                // done... no sites left to be archived
198                return null;
199            }
200
201            $nextSiteId = array_shift($siteIds);
202
203            $self->setSiteIdsToArchive($siteIds);
204
205            return $nextSiteId;
206        });
207
208        if (is_null($this->currentSiteId)) {
209            $this->done = true;
210        }
211
212        return $this->currentSiteId;
213    }
214
215    public static function isSupported()
216    {
217        return Process::isSupported();
218    }
219
220    /**
221     * @return bool
222     */
223    public function isContinuingPreviousRun(): bool
224    {
225        return $this->isContinuingPreviousRun;
226    }
227}
228