Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 137
0.00% covered (danger)
0.00%
0 / 29
CRAP
0.00% covered (danger)
0.00%
0 / 1
QueueDispatcher
0.00% covered (danger)
0.00%
0 / 137
0.00% covered (danger)
0.00%
0 / 29
4422
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
42
 getQueueForTask
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
42
 getQueueNames
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setQueues
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 addQueue
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 removeQueue
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
12
 hasQueue
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getQueue
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 getQueues
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 linkTaskToQueue
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 getLinkedTasks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getDefaultQueue
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
12
 getFirstQueue
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 setTaskSelector
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 initialize
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 createTask
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 setOwner
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getOwner
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 enqueue
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 dequeue
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 acknowledge
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 count
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 isSync
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 getWaitTime
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 hasPreFetchedMessages
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getTaskLog
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 propagateServices
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 assertQueues
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
30
 assertTasks
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\tao\model\taskQueue;
23
24use oat\generis\model\OntologyAwareTrait;
25use oat\oatbox\service\ConfigurableService;
26use oat\oatbox\log\LoggerAwareTrait;
27use oat\tao\model\taskQueue\Queue\TaskSelector\SelectorStrategyInterface;
28use oat\tao\model\taskQueue\Queue\TaskSelector\WeightStrategy;
29use oat\tao\model\taskQueue\Task\CallbackTask;
30use oat\tao\model\taskQueue\Task\CallbackTaskInterface;
31use oat\tao\model\taskQueue\Task\QueueAssociableInterface;
32use oat\tao\model\taskQueue\Task\TaskInterface;
33use oat\tao\model\taskQueue\TaskLog\TaskLogAwareInterface;
34use oat\tao\model\taskQueue\Worker\OneTimeWorker;
35
36/**
37 * @author Gyula Szucs <gyula@taotesting.com>
38 */
39class QueueDispatcher extends ConfigurableService implements QueueDispatcherInterface
40{
41    use LoggerAwareTrait;
42    use OntologyAwareTrait;
43
44    /**
45     * @var TaskLogInterface
46     */
47    private $taskLog;
48
49    /** @var  string */
50    private $owner;
51
52    /** @var SelectorStrategyInterface */
53    private $selectorStrategy;
54
55    private $propagated = false;
56
57    /**
58     * QueueDispatcher constructor.
59     *
60     * @param array $options
61     * @throws \common_exception_Error
62     */
63    public function __construct(array $options)
64    {
65        parent::__construct($options);
66
67        $this->assertQueues();
68
69        $this->assertTasks();
70
71        if (
72            !$this->hasOption(self::OPTION_TASK_SELECTOR_STRATEGY)
73            || empty($this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY))
74        ) {
75            // setting default strategy
76            $this->selectorStrategy = new WeightStrategy();
77        } else {
78            // using the strategy set in the options
79            if (!is_a($this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY), SelectorStrategyInterface::class)) {
80                throw new \common_exception_Error('Task selector must implement ' . SelectorStrategyInterface::class);
81            }
82
83            $this->selectorStrategy = $this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY);
84        }
85
86        if (!$this->hasOption(self::OPTION_TASK_LOG) || empty($this->getOption(self::OPTION_TASK_LOG))) {
87            throw new \common_exception_Error('Task Log service needs to be set.');
88        }
89    }
90
91    /**
92     * @param TaskInterface $task
93     * @return QueueInterface
94     */
95    protected function getQueueForTask(TaskInterface $task)
96    {
97        $action = $task instanceof CallbackTaskInterface && is_object($task->getCallable())
98            ? $task->getCallable()
99            : $task;
100
101        // getting queue name using the implemented getter function
102        if (
103            $action instanceof QueueAssociableInterface
104            && ($queueName = $action->getQueueName($task->getParameters()))
105        ) {
106            return $this->getQueue($queueName);
107        }
108
109        // getting the queue name based on the linked tasks configuration
110        $className = get_class($action);
111        if (array_key_exists($className, $this->getLinkedTasks())) {
112            $queueName = $this->getLinkedTasks()[$className];
113
114            return $this->getQueue($queueName);
115        }
116
117        // if we still don't have a queue, let's use the default one
118        return $this->getDefaultQueue();
119    }
120
121    /**
122     * @inheritdoc
123     */
124    public function getQueueNames()
125    {
126        return array_map(function (QueueInterface $queue) {
127            return $queue->getName();
128        }, $this->getOption(self::OPTION_QUEUES));
129    }
130
131    /**
132     * @inheritdoc
133     */
134    public function setQueues(array $queues)
135    {
136        $this->propagated = false;
137
138        $this->setOption(self::OPTION_QUEUES, $queues);
139
140        return $this;
141    }
142
143    /**
144     * @inheritdoc
145     * @throws \LogicException
146     */
147    public function addQueue(QueueInterface $queue)
148    {
149        if ($this->hasQueue($queue->getName())) {
150            throw new \LogicException('Queue "' . $queue . '" is already registered.');
151        }
152
153        $this->propagated = false;
154
155        $queues = $this->getQueues();
156        $queues[] = $queue;
157
158        $this->setOption(self::OPTION_QUEUES, $queues);
159
160        return $this;
161    }
162
163    public function removeQueue(string $queueName): self
164    {
165        $queues = $this->getQueues();
166
167        foreach ($queues as $key => $queue) {
168            if ($queue->getName() === $queueName) {
169                unset($queues[$key]);
170            }
171        }
172
173        $this->setQueues(array_values($queues));
174
175        return $this;
176    }
177
178    /**
179     * @inheritdoc
180     */
181    public function hasQueue($queueName)
182    {
183        return in_array($queueName, $this->getQueueNames());
184    }
185
186    /**
187     * @inheritdoc
188     */
189    public function getQueue($queueName)
190    {
191        $foundQueue = array_filter($this->getQueues(), function (QueueInterface $queue) use ($queueName) {
192            return $queue->getName() === $queueName;
193        });
194
195        if (count($foundQueue) === 1) {
196            return reset($foundQueue);
197        }
198
199        throw new \InvalidArgumentException('Queue "' . $queueName . '" does not exist.');
200    }
201
202    /**
203     * @return QueueInterface[]
204     */
205    public function getQueues()
206    {
207        if (!$this->propagated) {
208            $queues = (array) $this->getOption(self::OPTION_QUEUES);
209
210            // propagate the services for the queues first
211            array_walk($queues, function (QueueInterface $queue) {
212                $this->propagateServices($queue);
213            });
214
215            $this->propagated = true;
216        }
217
218        return $this->getOption(self::OPTION_QUEUES);
219    }
220
221    /**
222     * @inheritdoc
223     */
224    public function linkTaskToQueue($taskName, $queueName)
225    {
226        if (is_object($taskName)) {
227            $taskName = get_class($taskName);
228        }
229
230        if (!$this->hasQueue($queueName)) {
231            throw new \LogicException(
232                'Task "' . $taskName . '" cannot be added to "' . $queueName . '". Queue is not registered.'
233            );
234        }
235
236        $tasks = $this->getLinkedTasks();
237
238        $tasks[ (string) $taskName ] = $queueName;
239
240        $this->setOption(self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS, $tasks);
241
242        return $this;
243    }
244
245    /**
246     * @inheritdoc
247     */
248    public function getLinkedTasks()
249    {
250        return (array) $this->getOption(self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS);
251    }
252
253    /**
254     * Return the first queue as a default one.
255     * Maybe, later we need other logic the determine the default queue.
256     *
257     * @return QueueInterface
258     */
259    public function getDefaultQueue()
260    {
261        return $this->hasOption(self::OPTION_DEFAULT_QUEUE) && $this->getOption(self::OPTION_DEFAULT_QUEUE)
262            ? $this->getQueue($this->getOption(self::OPTION_DEFAULT_QUEUE))
263            : $this->getFirstQueue();
264    }
265
266    /**
267     * Return the first queue from the array.
268     *
269     * @return QueueInterface
270     */
271    protected function getFirstQueue()
272    {
273        $queues = $this->getQueues();
274
275        return reset($queues);
276    }
277
278    /**
279     * @inheritdoc
280     */
281    public function setTaskSelector(SelectorStrategyInterface $selectorStrategy)
282    {
283        $this->setOption(self::OPTION_TASK_SELECTOR_STRATEGY, $selectorStrategy);
284
285        return $this;
286    }
287
288    /**
289     * Initialize queue.
290     *
291     * @return void
292     */
293    public function initialize()
294    {
295        foreach ($this->getQueues() as $queue) {
296            $queue->initialize();
297        }
298    }
299
300    /**
301     * @inheritdoc
302     */
303    public function createTask(
304        callable $callable,
305        array $parameters = [],
306        $label = null,
307        TaskInterface $parent = null,
308        $masterStatus = false
309    ) {
310        $id = \common_Utils::getNewUri();
311        $owner = $parent ? $parent->getOwner() : $this->getOwner();
312
313        $callbackTask = new CallbackTask($id, $owner);
314        $callbackTask->setCallable($callable)
315            ->setParameter($parameters);
316
317        if ($parent) {
318            $callbackTask->setParentId($parent->getId());
319        }
320
321        $callbackTask->setMasterStatus($masterStatus);
322
323        if ($this->enqueue($callbackTask, $label)) {
324            $callbackTask->markAsEnqueued();
325        }
326
327        return $callbackTask;
328    }
329
330    /**
331     * @param string $owner
332     */
333    public function setOwner($owner)
334    {
335        $this->owner = $owner;
336    }
337
338    /**
339     * @return string
340     * @throws \common_exception_Error
341     */
342    public function getOwner()
343    {
344        if (is_null($this->owner)) {
345            return \common_session_SessionManager::getSession()->getUser()->getIdentifier();
346        }
347
348        return $this->owner;
349    }
350
351    /**
352     * @param TaskInterface $task
353     * @param null|string   $label
354     * @return bool
355     */
356    public function enqueue(TaskInterface $task, $label = null)
357    {
358        $queue = $this->getQueueForTask($task);
359        $isEnqueued = $queue->enqueue($task, $label);
360
361        // if we need to run the task straightaway, then run a worker on-the-fly for one round.
362        if ($isEnqueued && $queue->isSync()) {
363            $oneTimeWorker = new OneTimeWorker($queue, $this->getTaskLog());
364            $this->propagate($oneTimeWorker);
365            $oneTimeWorker->run();
366        }
367
368        return $isEnqueued;
369    }
370
371    /**
372     * Receive a task from a specified queue or from a queue selected by a predefined strategy
373     *
374     * @inheritdoc
375     */
376    public function dequeue()
377    {
378        // if there is only one queue defined, let's use that
379        if (count($this->getQueues()) === 1) {
380            return $this->getFirstQueue()->dequeue();
381        }
382
383        // default: getting a task using the current task selector strategy
384        return $this->selectorStrategy->pickNextTask($this->getQueues());
385    }
386
387    /**
388     * @inheritdoc
389     */
390    public function acknowledge(TaskInterface $task)
391    {
392        $this->getQueueForTask($task)->acknowledge($task);
393    }
394
395    /**
396     * Count of messages in all queues.
397     *
398     * @return int
399     */
400    public function count(): int
401    {
402        $counts = array_map(function (QueueInterface $queue) {
403            return $queue->count();
404        }, $this->getQueues());
405
406        return array_sum($counts);
407    }
408
409    /**
410     * @inheritdoc
411     */
412    public function isSync()
413    {
414        foreach ($this->getQueues() as $queue) {
415            if (!$queue->isSync()) {
416                return false;
417            }
418        }
419
420        return true;
421    }
422
423    public function getWaitTime()
424    {
425        return $this->selectorStrategy->getWaitTime();
426    }
427
428    /**
429     * @inheritdoc
430     */
431    public function hasPreFetchedMessages(): bool
432    {
433        return true;
434    }
435
436    /**
437     * @return TaskLogInterface
438     */
439    protected function getTaskLog()
440    {
441        if (is_null($this->taskLog)) {
442            $this->taskLog = $this->getServiceLocator()->get($this->getOption(self::OPTION_TASK_LOG));
443        }
444
445        return $this->taskLog;
446    }
447
448    /**
449     * @param QueueInterface $queue
450     * @return QueueInterface
451     */
452    protected function propagateServices(QueueInterface $queue)
453    {
454        $this->propagate($queue);
455
456        if ($queue instanceof TaskLogAwareInterface) {
457            $queue->setTaskLog($this->getTaskLog());
458        }
459
460        return $queue;
461    }
462
463    /**
464     * @throws \InvalidArgumentException
465     */
466    private function assertQueues()
467    {
468        if (!$this->hasOption(self::OPTION_QUEUES) || empty($this->getOption(self::OPTION_QUEUES))) {
469            throw new \InvalidArgumentException("Queues needs to be set.");
470        }
471
472        if (count($this->getOption(self::OPTION_QUEUES)) === 1) {
473            return;
474        }
475
476        if (
477            count($this->getOption(self::OPTION_QUEUES)) != count(array_unique($this->getOption(self::OPTION_QUEUES)))
478        ) {
479            throw new \InvalidArgumentException(
480                'There are duplicated Queue names. Please check the values of "' . self::OPTION_QUEUES
481                . '" in your queue dispatcher settings.'
482            );
483        }
484    }
485
486    /**
487     * @throws \InvalidArgumentException
488     */
489    private function assertTasks()
490    {
491        if (empty($this->getLinkedTasks())) {
492            return;
493        }
494
495        // check if every task is linked to a registered queue
496        $notRegisteredQueues = array_diff(array_values($this->getLinkedTasks()), $this->getQueueNames());
497
498        if (count($notRegisteredQueues)) {
499            throw new \LogicException(
500                'Found not registered queue(s) linked to task(s): "' . implode('", "', $notRegisteredQueues)
501                    . '". Please check the values of "' . self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS
502                    . '" in your queue dispatcher settings.'
503            );
504        }
505    }
506}