1<?php 2// (c) Copyright by authors of the Tiki Wiki CMS Groupware Project 3// 4// All Rights Reserved. See copyright.txt for details and a complete list of authors. 5// Licensed under the GNU LESSER GENERAL PUBLIC LICENSE. See license.txt for details. 6// $Id$ 7 8class QueueLib extends TikiDb_Bridge 9{ 10 private $queue; 11 12 public function __construct() 13 { 14 $this->queue = $this->table('tiki_queue'); 15 } 16 17 public function pushAll($queue, array $messages) 18 { 19 foreach ($messages as $message) { 20 $this->push($queue, $message); 21 } 22 } 23 24 public function push($queue, array $message) 25 { 26 $this->queue->insert( 27 [ 28 'queue' => $queue, 29 'timestamp' => TikiLib::lib('tiki')->now, 30 'message' => json_encode($message), 31 ] 32 ); 33 } 34 35 public function clear($queue) 36 { 37 $this->queue->deleteMultiple(['queue' => $queue,]); 38 } 39 40 public function count($queue) 41 { 42 return $this->queue->fetchCount(['queue' => $queue,]); 43 } 44 45 public function pull($queue, $count = 1) 46 { 47 $handler = uniqid(); 48 49 // Mark entries as in processing 50 $this->queue->updateMultiple( 51 ['handler' => $handler], 52 [ 53 'queue' => $queue, 54 'handler' => null, 55 ], 56 $count 57 ); 58 59 // Obtain the marked list 60 $messages = $this->queue->fetchColumn('message', ['handler' => $handler,]); 61 62 // Delete from the queue 63 $this->queue->deleteMultiple(['handler' => $handler,]); 64 65 // Strip duplicate messages 66 $messages = array_unique($messages); 67 if (count($messages)) { 68 return array_map('json_decode', $messages, array_fill(0, count($messages), true)); 69 } 70 71 return []; 72 } 73} 74