1<?php
2/***********************************************
3* File      :   ipcmemcachedprovider.php
4* Project   :   Z-Push
5* Descr     :   IPC provider using Memcached PHP extension
6*               and memcached servers defined in
7*               $zpush_ipc_memcached_servers
8*
9* Created   :   22.11.2015 by Ralf Becker <rb@stylite.de>
10*
11* Copyright 2007 - 2016 Zarafa Deutschland GmbH
12*
13* This program is free software: you can redistribute it and/or modify
14* it under the terms of the GNU Affero General Public License, version 3,
15* as published by the Free Software Foundation.
16*
17* This program is distributed in the hope that it will be useful,
18* but WITHOUT ANY WARRANTY; without even the implied warranty of
19* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20* GNU Affero General Public License for more details.
21*
22* You should have received a copy of the GNU Affero General Public License
23* along with this program.  If not, see <http://www.gnu.org/licenses/>.
24*
25* Consult LICENSE file for details
26************************************************/
27//include own config file
28require_once("backend/ipcmemcached/config.php");
29
30class IpcMemcachedProvider implements IIpcProvider {
31    protected $type;
32    protected $serverKey;
33    private $typeMutex;
34    private $maxWaitCycles;
35    private $logWaitCycles;
36    private $isDownUntil;
37    private $wasDown;
38    private $reconnectCount;
39    private $globalMutex;
40
41    /**
42     * Instance of memcached class
43     *
44     * @var memcached
45     */
46    protected $memcached;
47
48
49    /**
50     * Constructor
51     *
52     * @param int $type
53     * @param int $allocate
54     * @param string $class
55     * @param string $serverKey
56     * @param boolean $globalMutex (opt)    When true, it configures a single server pool taking the first one from MEMCACHED_SERVERS.
57     */
58    public function __construct($type, $allocate, $class, $serverKey) {
59        $this->type = $type;
60        $this->typeMutex = $type . "MX";
61        $this->serverKey = $serverKey;
62        $this->maxWaitCycles = round(MEMCACHED_MUTEX_TIMEOUT * 1000 / MEMCACHED_BLOCK_WAIT)+1;
63        $this->logWaitCycles = round($this->maxWaitCycles/5);
64        $this->globalMutex = (strcmp($allocate, 'globalmutex') === 0) ? true : false;
65
66        // not used, but required by function signature
67        unset($allocate, $class);
68
69        if (!class_exists('Memcached')) {
70            throw new FatalMisconfigurationException("IpcMemcachedProvider failure: can not find class Memcached. Please make sure the php memcached extension is installed.");
71        }
72
73        $this->reconnectCount = 0;
74        $this->init();
75
76        // check if memcached was down recently
77        $this->isDownUntil = $this->getIsDownUntil();
78        $this->wasDown = ! $this->IsActive();
79    }
80
81    /**
82     * Initializes the Memcached object & connection.
83     *
84     * @access private
85     * @return void
86     */
87    private function init() {
88        if ($this->globalMutex === false) {
89            $this->memcached = new Memcached(md5(MEMCACHED_SERVERS) . $this->reconnectCount++);
90        } else{
91            $this->memcached = new Memcached('globalmutex' . $this->reconnectCount++);
92        }
93        $this->memcached->setOptions(array(
94            // setting a short timeout, to better kope with failed nodes
95            Memcached::OPT_CONNECT_TIMEOUT => MEMCACHED_TIMEOUT,
96            Memcached::OPT_SEND_TIMEOUT => MEMCACHED_TIMEOUT * 1000,
97            Memcached::OPT_RECV_TIMEOUT => MEMCACHED_TIMEOUT * 1000,
98
99            // use igbinary, if available
100            Memcached::OPT_SERIALIZER => Memcached::HAVE_IGBINARY ? Memcached::SERIALIZER_IGBINARY : (Memcached::HAVE_JSON ? Memcached::SERIALIZER_JSON : Memcached::SERIALIZER_PHP),
101            // use more efficient binary protocol (also required for consistent hashing)
102            Memcached::OPT_BINARY_PROTOCOL => true,
103            // enable Libketama compatible consistent hashing
104            Memcached::OPT_LIBKETAMA_COMPATIBLE => true,
105            // automatic failover and disabling of failed nodes
106            Memcached::OPT_SERVER_FAILURE_LIMIT => 2,
107            Memcached::OPT_AUTO_EJECT_HOSTS => true,
108            // setting a prefix for all keys
109            Memcached::OPT_PREFIX_KEY => MEMCACHED_PREFIX,
110        ));
111
112        // with persistent connections, only add servers, if they not already added!
113        if (!count($this->memcached->getServerList())) {
114            if ($this->globalMutex === false) {
115                foreach(explode(',', MEMCACHED_SERVERS) as $host_port) {
116                    list($host,$port) = explode(':', trim($host_port));
117                    $this->memcached->addServer($host, $port);
118                }
119            } else{
120                $memcachedServersList = explode(',', MEMCACHED_SERVERS);
121                //get the first configured server
122                list($host,$port) = explode(':', trim($memcachedServersList[0]));
123                $this->memcached->addServer($host, $port);
124            }
125        }
126    }
127
128    /**
129     * Reinitializes the IPC data. If the provider has no way of performing
130     * this action, it should return 'false'.
131     *
132     * @access public
133     * @return boolean
134     */
135    public function ReInitIPC() {
136        // this is not supported in memcache
137        return false;
138    }
139
140    /**
141     * Cleans up the IPC data block.
142     *
143     * @access public
144     * @return boolean
145     */
146    public function Clean() {
147        return false;
148    }
149
150    /**
151     * Indicates if the IPC is active.
152     *
153     * @access public
154     * @return boolean
155     */
156    public function IsActive() {
157        $down = $this->isDownUntil > time();
158        // reconnect if we were down but should retry now
159        if (!$down && $this->wasDown) {
160            ZLog::Write(LOGLEVEL_DEBUG, "IpcMemcachedProvider->IsActive(): memcache was down, trying to reconnect");
161            $this->init();
162            $this->wasDown = false;
163        }
164        return !$down;
165    }
166
167    /**
168     * Blocks the class mutex.
169     * Method blocks until mutex is available!
170     * ATTENTION: make sure that you *always* release a blocked mutex!
171     *
172     * We try to add mutex to our cache, until we succeed.
173     * It will fail as long other client has stored it or the
174     * MEMCACHED_MUTEX_TIMEOUT is reached.
175     *
176     * @access public
177     * @return boolean
178     */
179    public function BlockMutex() {
180        if (!$this->IsActive()) {
181            return false;
182        }
183
184        $n = 0;
185        while(!$this->memcached->addByKey($this->serverKey, $this->typeMutex, true, MEMCACHED_MUTEX_TIMEOUT)) {
186            if (++$n % $this->logWaitCycles == 0) {
187                ZLog::Write(LOGLEVEL_DEBUG, sprintf("IpcMemcachedProvider->BlockMutex() waiting to aquire mutex for type: %s ", $this->typeMutex));
188            }
189            // wait before retrying
190            usleep(MEMCACHED_BLOCK_WAIT * 1000);
191            if ($n > $this->maxWaitCycles) {
192                ZLog::Write(LOGLEVEL_ERROR, sprintf("IpcMemcachedProvider->BlockMutex() could not aquire mutex for type: %s. Check memcache service!", $this->typeMutex));
193                $this->markAsDown();
194                return false;
195            }
196        }
197        if ($n*MEMCACHED_BLOCK_WAIT > 50) {
198            ZLog::Write(LOGLEVEL_WARN, sprintf("IpcMemcachedProvider->BlockMutex() mutex aquired after waiting for %sms for type: %s", ($n*MEMCACHED_BLOCK_WAIT), $this->typeMutex));
199        }
200        return true;
201    }
202
203    /**
204     * Releases the class mutex.
205     * After the release other processes are able to block the mutex themselves.
206     *
207     * @access public
208     * @return boolean
209     */
210    public function ReleaseMutex() {
211        return $this->memcached->deleteByKey($this->serverKey, $this->typeMutex);
212    }
213
214    /**
215     * Indicates if the requested variable is available in IPC data.
216     *
217     * @param int   $id     int indicating the variable
218     *
219     * @access public
220     * @return boolean
221     */
222    public function HasData($id = 2) {
223        $this->memcached->getByKey($this->serverKey, $this->type.':'.$id);
224        return $this->memcached->getResultCode() === Memcached::RES_SUCCESS;
225    }
226
227    /**
228     * Returns the requested variable from IPC data.
229     *
230     * @param int   $id     int indicating the variable
231     *
232     * @access public
233     * @return mixed
234     */
235    public function GetData($id = 2) {
236        return $this->memcached->getByKey($this->serverKey, $this->type.':'.$id);
237    }
238
239    /**
240     * Writes the transmitted variable to IPC data.
241     * Subclasses may never use an id < 2!
242     *
243     * @param mixed $data   data which should be saved into IPC data
244     * @param int   $id     int indicating the variable (bigger than 2!)
245     *
246     * @access public
247     * @return boolean
248     */
249    public function SetData($data, $id = 2) {
250        return $this->memcached->setByKey($this->serverKey, $this->type.':'.$id, $data);
251    }
252
253    /**
254     * Gets the epoch time until the memcache server should not be retried.
255     * If there is no data available, 0 is returned.
256     *
257     * @access private
258     * @return long
259     */
260    private function getIsDownUntil() {
261        if (file_exists(MEMCACHED_DOWN_LOCK_FILE)) {
262            $timestamp = file_get_contents(MEMCACHED_DOWN_LOCK_FILE);
263            // is the lock file expired?
264            if ($timestamp > time()) {
265                ZLog::Write(LOGLEVEL_WARN, sprintf("IpcMemcachedProvider(): Memcache service is marked as down until %s.", strftime("%d.%m.%Y %H:%M:%S", $timestamp)));
266                return $timestamp;
267            }
268            else {
269                @unlink(MEMCACHED_DOWN_LOCK_FILE);
270            }
271        }
272        return 0;
273    }
274
275    /**
276     * Indicates that memcache is not available and that it should not be retried.
277     *
278     * @access private
279     * @return boolean
280     */
281    private function markAsDown() {
282        ZLog::Write(LOGLEVEL_WARN, sprintf("IpcMemcachedProvider(): Marking memcache service as down for %d seconds.", MEMCACHED_DOWN_LOCK_EXPIRATION));
283        $downUntil = time() + MEMCACHED_DOWN_LOCK_EXPIRATION;
284        $this->isDownUntil = $downUntil;
285        $this->wasDown = true;
286        return !!file_put_contents(MEMCACHED_DOWN_LOCK_FILE, $downUntil);
287    }
288}