1<?php 2/** 3 * Matomo - free/libre analytics platform 4 * 5 * @link https://matomo.org 6 * @license http://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later 7 * 8 */ 9namespace Piwik\CronArchive; 10 11use Exception; 12use Piwik\CliMulti\Process; 13use Piwik\Log; 14use Piwik\Option; 15 16/** 17 * This class saves all to be processed siteIds in an Option named 'SharedSiteIdsToArchive' and processes all sites 18 * within that list. If a user starts multiple archiver those archiver will help to finish processing that list. 19 */ 20class SharedSiteIds 21{ 22 const OPTION_DEFAULT = 'SharedSiteIdsToArchive'; 23 const OPTION_ALL_WEBSITES = 'SharedSiteIdsToArchive_AllWebsites'; 24 const KEY_TIMESTAMP = '_ResetQueueTime'; 25 26 /** 27 * @var string 28 */ 29 private $optionName; 30 31 private $siteIds = array(); 32 private $currentSiteId; 33 private $done = false; 34 private $initialResetQueueTime = null; 35 private $isContinuingPreviousRun = false; 36 37 public function __construct($websiteIds, $optionName = self::OPTION_DEFAULT) 38 { 39 $this->optionName = $optionName; 40 41 if (empty($websiteIds)) { 42 $websiteIds = array(); 43 } 44 45 $self = $this; 46 $this->siteIds = $this->runExclusive(function () use ($self, $websiteIds) { 47 // if there are already sites to be archived registered, prefer the list of existing archive, meaning help 48 // to finish this queue of sites instead of starting a new queue 49 $existingWebsiteIds = $self->getAllSiteIdsToArchive(); 50 51 if (!empty($existingWebsiteIds)) { 52 $this->isContinuingPreviousRun = true; 53 return $existingWebsiteIds; 54 } 55 56 $self->setQueueWasReset(); 57 $self->setSiteIdsToArchive($websiteIds); 58 59 return $websiteIds; 60 }); 61 62 $this->initialResetQueueTime = $this->getResetQueueTime(); 63 } 64 65 public function setQueueWasReset() 66 { 67 Option::set($this->optionName . self::KEY_TIMESTAMP, floor(microtime(true) * 1000)); 68 } 69 70 private function getResetQueueTime() 71 { 72 Option::clearCachedOption($this->optionName . self::KEY_TIMESTAMP); 73 return (int) Option::get($this->optionName . self::KEY_TIMESTAMP); 74 } 75 76 public function getInitialSiteIds() 77 { 78 return $this->siteIds; 79 } 80 81 /** 82 * Get the number of total websites that needs to be processed. 83 * 84 * @return int 85 */ 86 public function getNumSites() 87 { 88 return count($this->siteIds); 89 } 90 91 /** 92 * Get the number of already processed websites (not necessarily all of those where processed by this archiver). 93 * 94 * @return int 95 */ 96 public function getNumProcessedWebsites() 97 { 98 if ($this->done) { 99 return $this->getNumSites(); 100 } 101 102 if (empty($this->currentSiteId)) { 103 return 0; 104 } 105 106 $index = array_search($this->currentSiteId, $this->siteIds); 107 108 if (false === $index) { 109 return 0; 110 } 111 112 return $index + 1; 113 } 114 115 public function setSiteIdsToArchive($siteIds) 116 { 117 if (!empty($siteIds)) { 118 Option::set($this->optionName, implode(',', $siteIds)); 119 } else { 120 Option::delete($this->optionName); 121 } 122 } 123 124 public function getAllSiteIdsToArchive() 125 { 126 Option::clearCachedOption($this->optionName); 127 $siteIdsToArchive = Option::get($this->optionName); 128 129 if (empty($siteIdsToArchive)) { 130 return array(); 131 } 132 133 return explode(',', trim($siteIdsToArchive)); 134 } 135 136 /** 137 * If there are multiple archiver running on the same node it makes sure only one of them performs an action and it 138 * will wait until another one has finished. Any closure you pass here should be very fast as other processes wait 139 * for this closure to finish otherwise. Currently only used for making multiple archivers at the same time work. 140 * If a closure takes more than 5 seconds we assume it is dead and simply continue. 141 * 142 * @param \Closure $closure 143 * @return mixed 144 * @throws \Exception 145 */ 146 private function runExclusive($closure) 147 { 148 $process = new Process('archive.sharedsiteids'); 149 150 while ($process->isRunning() && $process->getSecondsSinceCreation() < 5) { 151 // wait max 5 seconds, such an operation should not take longer 152 usleep(25 * 1000); 153 } 154 155 $process->startProcess(); 156 157 try { 158 $result = $closure(); 159 } catch (Exception $e) { 160 $process->finishProcess(); 161 throw $e; 162 } 163 164 $process->finishProcess(); 165 166 return $result; 167 } 168 169 /** 170 * Get the next site id that needs to be processed or null if all site ids where processed. 171 * 172 * @return int|null 173 */ 174 public function getNextSiteId() 175 { 176 if ($this->done) { 177 // we make sure we don't check again whether there are more sites to be archived as the list of 178 // sharedSiteIds may have been reset by now. 179 return null; 180 } 181 182 if ($this->initialResetQueueTime !== $this->getResetQueueTime()) { 183 // queue was reset/finished by some other process 184 $this->currentSiteId = null; 185 $this->done = true; 186 Log::debug('The shared site ID queue was reset, stopping.'); 187 return null; 188 } 189 190 $self = $this; 191 192 $this->currentSiteId = $this->runExclusive(function () use ($self) { 193 194 $siteIds = $self->getAllSiteIdsToArchive(); 195 196 if (empty($siteIds)) { 197 // done... no sites left to be archived 198 return null; 199 } 200 201 $nextSiteId = array_shift($siteIds); 202 203 $self->setSiteIdsToArchive($siteIds); 204 205 return $nextSiteId; 206 }); 207 208 if (is_null($this->currentSiteId)) { 209 $this->done = true; 210 } 211 212 return $this->currentSiteId; 213 } 214 215 public static function isSupported() 216 { 217 return Process::isSupported(); 218 } 219 220 /** 221 * @return bool 222 */ 223 public function isContinuingPreviousRun(): bool 224 { 225 return $this->isContinuingPreviousRun; 226 } 227} 228