Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
73.21% covered (warning)
73.21%
41 / 56
66.67% covered (warning)
66.67%
12 / 18
CRAP
0.00% covered (danger)
0.00%
0 / 1
Queue
73.21% covered (warning)
73.21%
41 / 56
66.67% covered (warning)
66.67%
12 / 18
35.07
0.00% covered (danger)
0.00%
0 / 1
 __construct
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
2
 __toString
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 initialize
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getName
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setWeight
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 getWeight
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 setBroker
100.00% covered (success)
100.00%
3 / 3
100.00% covered (success)
100.00%
1 / 1
1
 getBroker
100.00% covered (success)
100.00%
2 / 2
100.00% covered (success)
100.00%
1 / 1
1
 enqueue
69.23% covered (warning)
69.23%
9 / 13
0.00% covered (danger)
0.00%
0 / 1
4.47
 dequeue
100.00% covered (success)
100.00%
10 / 10
100.00% covered (success)
100.00%
1 / 1
3
 acknowledge
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 count
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 isSync
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfTasksToReceive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 hasPreFetchedMessages
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 canDequeueTask
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 getLogContext
100.00% covered (success)
100.00%
4 / 4
100.00% covered (success)
100.00%
1 / 1
1
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017-2021 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\tao\model\taskQueue;
23
24use oat\oatbox\log\LoggerAwareTrait;
25use oat\oatbox\mutex\LockTrait;
26use oat\oatbox\service\ServiceManager;
27use oat\tao\model\taskQueue\Queue\Broker\QueueBrokerInterface;
28use oat\tao\model\taskQueue\Queue\Broker\SyncQueueBrokerInterface;
29use oat\tao\model\taskQueue\Task\TaskInterface;
30use oat\tao\model\taskQueue\TaskLog\TaskLogAwareInterface;
31use oat\tao\model\taskQueue\TaskLog\TaskLogAwareTrait;
32use Zend\ServiceManager\ServiceLocatorAwareInterface;
33use Zend\ServiceManager\ServiceLocatorAwareTrait;
34
35/**
36 * @author Gyula Szucs <gyula@taotesting.com>
37 */
38class Queue implements QueueInterface, TaskLogAwareInterface
39{
40    use LoggerAwareTrait;
41    use ServiceLocatorAwareTrait;
42    use TaskLogAwareTrait;
43    use LockTrait;
44
45    public const LOCK_PREFIX = 'taskqueue_lock_';
46    private $name;
47
48    /**
49     * @var QueueBrokerInterface|ServiceLocatorAwareInterface
50     */
51    private $broker;
52
53    /**
54     * @var int
55     */
56    private $weight;
57
58    /**
59     * @param string               $name
60     * @param QueueBrokerInterface $broker
61     * @param int                  $weight
62     */
63    public function __construct($name, QueueBrokerInterface $broker, $weight = 1)
64    {
65        if (empty($name)) {
66            throw new \InvalidArgumentException("Queue name needs to be set.");
67        }
68
69        $this->name = $name;
70        $this->setWeight($weight);
71        $this->setBroker($broker);
72    }
73
74    /**
75     * @inheritdoc
76     */
77    public function __toString()
78    {
79        return $this->getName();
80    }
81
82    /**
83     * @inheritdoc
84     */
85    public function __toPhpCode()
86    {
87        return 'new ' . get_called_class() . '('
88            . \common_Utils::toHumanReadablePhpString($this->name)
89            . ', '
90            . \common_Utils::toHumanReadablePhpString($this->broker)
91            . ', '
92            . \common_Utils::toHumanReadablePhpString($this->weight)
93            . ')';
94    }
95
96    /**
97     * @inheritdoc
98     */
99    public function initialize()
100    {
101        $this->getBroker()->createQueue();
102    }
103
104    /**
105     * @inheritdoc
106     */
107    public function getName()
108    {
109        return $this->name;
110    }
111
112    /**
113     * @param int $weight
114     *
115     * @return Queue
116     */
117    public function setWeight($weight)
118    {
119        $this->weight = abs($weight);
120
121        return $this;
122    }
123
124    /**
125     * @inheritdoc
126     */
127    public function getWeight()
128    {
129        return $this->weight;
130    }
131
132    /**
133     * @inheritdoc
134     */
135    public function setBroker(QueueBrokerInterface $broker)
136    {
137        $this->broker = $broker;
138
139        $this->broker->setQueueName($this->getName());
140
141        return $this;
142    }
143
144    public function getBroker(): QueueBrokerInterface
145    {
146        $this->broker->setServiceLocator($this->getServiceLocator() ?? ServiceManager::getServiceManager());
147
148        return $this->broker;
149    }
150
151    /**
152     * @inheritdoc
153     */
154    public function enqueue(TaskInterface $task, $label = null)
155    {
156        try {
157            if (!is_null($label)) {
158                $task->setLabel($label);
159            }
160            $lock = $this->createLock(self::LOCK_PREFIX . $task->getId());
161            $lock->acquire(true);
162            $isEnqueued = $this->getBroker()->push($task);
163            if ($isEnqueued) {
164                $this->getTaskLog()
165                    ->add($task, TaskLogInterface::STATUS_ENQUEUED, $label);
166            }
167
168            return $isEnqueued;
169        } catch (\Exception $e) {
170            $this->logError('Enqueueing ' . $task . ' failed with MSG: ' . $e->getMessage());
171        } finally {
172            $lock->release();
173        }
174
175        return false;
176    }
177
178    /**
179     * @inheritdoc
180     */
181    public function dequeue()
182    {
183        $task = $this->getBroker()->pop();
184        if (!$task) {
185            return null;
186        }
187        $lock = $this->createLock(self::LOCK_PREFIX . $task->getId());
188        $lock->acquire(true);
189        if ($this->canDequeueTask($task)) {
190            $this->getTaskLog()->setStatus($task->getId(), TaskLogInterface::STATUS_DEQUEUED);
191            $this->logInfo(sprintf('Task %s has been dequeued', $task->getId()), $this->getLogContext());
192        }
193        $lock->release();
194        return $task;
195    }
196
197    /**
198     * @inheritdoc
199     */
200    public function acknowledge(TaskInterface $task)
201    {
202        $this->getBroker()->delete($task);
203    }
204
205    /**
206     * Count of messages in the queue.
207     */
208    public function count(): int
209    {
210        return $this->getBroker()->count();
211    }
212
213    /**
214     * @return bool
215     */
216    public function isSync()
217    {
218        return $this->broker instanceof SyncQueueBrokerInterface;
219    }
220
221    /**
222     * @inheritdoc
223     */
224    public function getNumberOfTasksToReceive()
225    {
226        return $this->getBroker()->getNumberOfTasksToReceive();
227    }
228
229    /**
230     * @inheritdoc
231     */
232    public function hasPreFetchedMessages(): bool
233    {
234        return $this->getBroker()->hasPreFetchedMessages();
235    }
236
237    /**
238     * @param TaskInterface $task
239     *
240     * @return bool
241     */
242    protected function canDequeueTask(TaskInterface $task)
243    {
244        return $this->getTaskLog()->getStatus($task->getId()) != TaskLogInterface::STATUS_CANCELLED;
245    }
246
247    /**
248     * @return array
249     */
250    protected function getLogContext()
251    {
252        return [
253            'PID' => getmypid(),
254            'QueueName' => $this->getName(),
255        ];
256    }
257}