Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 181
0.00% covered (danger)
0.00%
0 / 33
CRAP
0.00% covered (danger)
0.00%
0 / 1
QueueDispatcher
0.00% covered (danger)
0.00%
0 / 181
0.00% covered (danger)
0.00%
0 / 33
6480
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
42
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getQueueForTask
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
42
 getQueueNames
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 setQueues
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 addQueue
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 hasQueue
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getQueue
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 getQueues
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
6
 linkTaskToQueue
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
12
 getLinkedTasks
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getDefaultQueue
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
12
 getFirstQueue
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getQueueByWeight
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 setTaskSelector
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 initialize
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
6
 createTask
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 setOwner
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getOwner
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 enqueue
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 dequeue
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
12
 acknowledge
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 count
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 isSync
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 getWaitTime
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getTaskLog
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 runWorker
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
 propagateServices
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 assertQueues
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
30
 assertTasks
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getTaskResource
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 getReportByLinkedResource
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
20
 linkTaskToResource
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
42
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\taoTaskQueue\model;
23
24use common_report_Report as Report;
25use oat\generis\model\OntologyAwareTrait;
26use oat\oatbox\service\ConfigurableService;
27use oat\oatbox\log\LoggerAwareTrait;
28use oat\oatbox\task\Task;
29use oat\tao\model\taskQueue\Queue\TaskSelector\SelectorStrategyInterface;
30use oat\taoTaskQueue\model\TaskSelector\WeightStrategy;
31use oat\taoTaskQueue\model\Task\CallbackTask;
32use oat\taoTaskQueue\model\Task\CallbackTaskInterface;
33use oat\taoTaskQueue\model\Task\TaskInterface;
34
35/**
36 * Class QueueDispatcher
37 *
38 * @deprecated Use \oat\tao\model\taskQueue\QueueDispatcher
39 *
40 * @author Gyula Szucs <gyula@taotesting.com>
41 */
42class QueueDispatcher extends ConfigurableService implements QueueDispatcherInterface
43{
44    use LoggerAwareTrait;
45    use OntologyAwareTrait;
46
47    /**
48     * @var TaskLogInterface
49     */
50    private $taskLog;
51
52    /** @var  string */
53    private $owner;
54
55    /** @var SelectorStrategyInterface */
56    private $selectorStrategy;
57
58    private $propagated = false;
59
60    /**
61     * QueueDispatcher constructor.
62     *
63     * @param array $options
64     * @throws \common_exception_Error
65     */
66    public function __construct(array $options)
67    {
68        parent::__construct($options);
69
70        $this->assertQueues();
71
72        $this->assertTasks();
73
74        if (
75            !$this->hasOption(self::OPTION_TASK_SELECTOR_STRATEGY)
76            || empty($this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY))
77        ) {
78            // setting default strategy
79            $this->selectorStrategy = new WeightStrategy();
80        } else {
81            // using the strategy set in the options
82            if (!is_a($this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY), SelectorStrategyInterface::class)) {
83                throw new \common_exception_Error('Task selector must implement ' . SelectorStrategyInterface::class);
84            }
85
86            $this->selectorStrategy = $this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY);
87        }
88
89        if (!$this->hasOption(self::OPTION_TASK_LOG) || empty($this->getOption(self::OPTION_TASK_LOG))) {
90            throw new \common_exception_Error('Task Log service needs to be set.');
91        }
92    }
93
94    /**
95     * @inheritdoc
96     */
97    public function __toPhpCode()
98    {
99        // to propagate the required services
100        $this->getQueues();
101
102        return parent::__toPhpCode();
103    }
104
105    /**
106     * @param TaskInterface $task
107     * @return QueueInterface
108     */
109    protected function getQueueForTask(TaskInterface $task)
110    {
111        $action = $task instanceof CallbackTaskInterface && is_object($task->getCallable())
112            ? $task->getCallable()
113            : $task;
114
115        // getting queue name using the implemented getter function
116        if (
117            $action instanceof QueueAssociableInterface
118            && ($queueName = $action->getQueueName($task->getParameters()))
119        ) {
120            return $this->getQueue($queueName);
121        }
122
123        // getting the queue name based on the linked tasks configuration
124        $className = get_class($action);
125        if (array_key_exists($className, $this->getLinkedTasks())) {
126            $queueName = $this->getLinkedTasks()[$className];
127
128            return $this->getQueue($queueName);
129        }
130
131        // if we still don't have a queue, let's use the default one
132        return $this->getDefaultQueue();
133    }
134
135    /**
136     * @inheritdoc
137     */
138    public function getQueueNames()
139    {
140        return array_map(function (QueueInterface $queue) {
141            return $queue->getName();
142        }, $this->getOption(self::OPTION_QUEUES));
143    }
144
145    /**
146     * @inheritdoc
147     */
148    public function setQueues(array $queues)
149    {
150        $this->propagated = false;
151
152        $this->setOption(self::OPTION_QUEUES, $queues);
153
154        return $this;
155    }
156
157    /**
158     * @inheritdoc
159     * @throws \LogicException
160     */
161    public function addQueue(QueueInterface $queue)
162    {
163        if ($this->hasQueue($queue->getName())) {
164            throw new \LogicException('Queue "' . $queue . '" is already registered.');
165        }
166
167        $this->propagated = false;
168
169        $queues = $this->getQueues();
170        $queues[] = $queue;
171
172        $this->setOption(self::OPTION_QUEUES, $queues);
173
174        return $this;
175    }
176
177    /**
178     * @inheritdoc
179     */
180    public function hasQueue($queueName)
181    {
182        return in_array($queueName, $this->getQueueNames());
183    }
184
185    /**
186     * @inheritdoc
187     */
188    public function getQueue($queueName)
189    {
190        $foundQueue = array_filter($this->getQueues(), function (QueueInterface $queue) use ($queueName) {
191            return $queue->getName() === $queueName;
192        });
193
194        if (count($foundQueue) === 1) {
195            return reset($foundQueue);
196        }
197
198        throw new \InvalidArgumentException('Queue "' . $queueName . '" does not exist.');
199    }
200
201    /**
202     * @return QueueInterface[]
203     */
204    public function getQueues()
205    {
206        if (!$this->propagated) {
207            $queues = (array) $this->getOption(self::OPTION_QUEUES);
208
209            // propagate the services for the queues first
210            array_walk($queues, function (QueueInterface $queue) {
211                $this->propagateServices($queue);
212            });
213
214            $this->propagated = true;
215        }
216
217        return $this->getOption(self::OPTION_QUEUES);
218    }
219
220    /**
221     * @inheritdoc
222     */
223    public function linkTaskToQueue($taskName, $queueName)
224    {
225        if (is_object($taskName)) {
226            $taskName = get_class($taskName);
227        }
228
229        if (!$this->hasQueue($queueName)) {
230            throw new \LogicException(
231                'Task "' . $taskName . '" cannot be added to "' . $queueName . '". Queue is not registered.'
232            );
233        }
234
235        $tasks = $this->getLinkedTasks();
236
237        $tasks[ (string) $taskName ] = $queueName;
238
239        $this->setOption(self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS, $tasks);
240
241        return $this;
242    }
243
244    /**
245     * @inheritdoc
246     */
247    public function getLinkedTasks()
248    {
249        return (array) $this->getOption(self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS);
250    }
251
252    /**
253     * Return the first queue as a default one.
254     * Maybe, later we need other logic the determine the default queue.
255     *
256     * @return QueueInterface
257     */
258    public function getDefaultQueue()
259    {
260        return $this->hasOption(self::OPTION_DEFAULT_QUEUE) && $this->getOption(self::OPTION_DEFAULT_QUEUE)
261            ? $this->getQueue($this->getOption(self::OPTION_DEFAULT_QUEUE))
262            : $this->getFirstQueue();
263    }
264
265    /**
266     * Return the first queue from the array.
267     *
268     * @return QueueInterface
269     */
270    protected function getFirstQueue()
271    {
272        $queues = $this->getQueues();
273
274        return reset($queues);
275    }
276
277    /**
278     * Gets random queue based on weight.
279     *
280     * For example, an array like ['A'=>5, 'B'=>45, 'C'=>50] means that "A" has a 5% chance of being selected,
281     * "B" 45%, and "C" 50%.
282     * The values are simply relative to each other. If one value weight was 2, and the other weight of 1,
283     * the value with the weight of 2 has about a 66% chance of being selected.
284     *
285     * @deprecated
286     * @return QueueInterface
287     */
288    public function getQueueByWeight()
289    {
290        $weights = array_map(function (QueueInterface $queue) {
291            return $queue->getWeight();
292        }, $this->getQueues());
293
294        $rand = mt_rand(1, array_sum($weights));
295
296        /** @var Queue $queue */
297        foreach ($this->getQueues() as $queue) {
298            $rand -= $queue->getWeight();
299            if ($rand <= 0) {
300                $this->logDebug('Queue "' . strtoupper($queue->getName()) . '" selected by weight.');
301                return $queue;
302            }
303        }
304    }
305
306    /**
307     * @inheritdoc
308     */
309    public function setTaskSelector(SelectorStrategyInterface $selectorStrategy)
310    {
311        $this->setOption(self::OPTION_TASK_SELECTOR_STRATEGY, $selectorStrategy);
312
313        return $this;
314    }
315
316    /**
317     * Initialize queue.
318     *
319     * @return void
320     */
321    public function initialize()
322    {
323        foreach ($this->getQueues() as $queue) {
324            $queue->initialize();
325        }
326    }
327
328    /**
329     * @inheritdoc
330     */
331    public function createTask(
332        callable $callable,
333        array $parameters = [],
334        $label = null,
335        TaskInterface $parent = null,
336        $masterStatus = false
337    ) {
338        $id = \common_Utils::getNewUri();
339        $owner = $parent ? $parent->getOwner() : $this->getOwner();
340
341        $callbackTask = new CallbackTask($id, $owner);
342        $callbackTask->setCallable($callable)
343            ->setParameter($parameters);
344
345        if ($parent) {
346            $callbackTask->setParentId($parent->getId());
347        }
348
349        $callbackTask->setMasterStatus($masterStatus);
350
351        if ($this->enqueue($callbackTask, $label)) {
352            $callbackTask->markAsEnqueued();
353        }
354
355        return $callbackTask;
356    }
357
358    /**
359     * @param string $owner
360     */
361    public function setOwner($owner)
362    {
363        $this->owner = $owner;
364    }
365
366    /**
367     * @return string
368     * @throws \common_exception_Error
369     */
370    public function getOwner()
371    {
372        if (is_null($this->owner)) {
373            return \common_session_SessionManager::getSession()->getUser()->getIdentifier();
374        }
375
376        return $this->owner;
377    }
378
379    /**
380     * @param TaskInterface $task
381     * @param null|string   $label
382     * @return bool
383     */
384    public function enqueue(TaskInterface $task, $label = null)
385    {
386        $queue = $this->getQueueForTask($task);
387        $isEnqueued = $queue->enqueue($task, $label);
388
389        // if we need to run the task straightaway
390        if ($isEnqueued && $queue->isSync()) {
391            $this->runWorker($queue);
392        }
393
394        return $isEnqueued;
395    }
396
397    /**
398     * @inheritdoc
399     */
400    public function dequeue($queueName = null)
401    {
402        if (!is_null($queueName)) {
403            return $this->getQueue($queueName)->dequeue();
404        }
405
406        // if there is only one queue defined, let's use that
407        if (count($this->getQueues()) === 1) {
408            return $this->getFirstQueue()->dequeue();
409        }
410
411        // default: getting a task using the current task selector strategy
412        return $this->selectorStrategy->pickNextTask($this->getQueues());
413    }
414
415    /**
416     * @inheritdoc
417     */
418    public function acknowledge(TaskInterface $task)
419    {
420        $this->getQueueForTask($task)->acknowledge($task);
421    }
422
423    /**
424     * Count of messages in all queues.
425     *
426     * @return int
427     */
428    public function count()
429    {
430        $counts = array_map(function (QueueInterface $queue) {
431            return $queue->count();
432        }, $this->getQueues());
433
434        return array_sum($counts);
435    }
436
437    /**
438     * @inheritdoc
439     */
440    public function isSync()
441    {
442        foreach ($this->getQueues() as $queue) {
443            if (!$queue->isSync()) {
444                return false;
445            }
446        }
447
448        return true;
449    }
450
451    public function getWaitTime()
452    {
453        return $this->selectorStrategy->getWaitTime();
454    }
455
456    /**
457     * @return TaskLogInterface
458     */
459    protected function getTaskLog()
460    {
461        if (is_null($this->taskLog)) {
462            $this->taskLog = $this->getServiceManager()->get($this->getOption(self::OPTION_TASK_LOG));
463        }
464
465        return $this->taskLog;
466    }
467
468    /**
469     * Run worker on-the-fly for one round.
470     *
471     * @param QueueInterface $queue
472     */
473    protected function runWorker(QueueInterface $queue)
474    {
475        (new Worker($this, $this->getTaskLog(), false))
476            ->setDedicatedQueue($queue, 1)
477            ->run();
478    }
479
480    /**
481     * @param QueueInterface $queue
482     * @return QueueInterface
483     */
484    protected function propagateServices(QueueInterface $queue)
485    {
486        $this->getServiceManager()->propagate($queue);
487
488        if ($queue instanceof TaskLogAwareInterface) {
489            $queue->setTaskLog($this->getTaskLog());
490        }
491
492        return $queue;
493    }
494
495    /**
496     * @throws \InvalidArgumentException
497     */
498    private function assertQueues()
499    {
500        if (!$this->hasOption(self::OPTION_QUEUES) || empty($this->getOption(self::OPTION_QUEUES))) {
501            throw new \InvalidArgumentException("Queues needs to be set.");
502        }
503
504        if (count($this->getOption(self::OPTION_QUEUES)) === 1) {
505            return;
506        }
507
508        if (
509            count($this->getOption(self::OPTION_QUEUES)) != count(array_unique($this->getOption(self::OPTION_QUEUES)))
510        ) {
511            throw new \InvalidArgumentException(
512                'There are duplicated Queue names. Please check the values of "'
513                    . self::OPTION_QUEUES . '" in your queue dispatcher settings.'
514            );
515        }
516    }
517
518    /**
519     * @throws \InvalidArgumentException
520     */
521    private function assertTasks()
522    {
523        if (empty($this->getLinkedTasks())) {
524            return;
525        }
526
527        // check if every task is linked to a registered queue
528        $notRegisteredQueues = array_diff(array_values($this->getLinkedTasks()), $this->getQueueNames());
529
530        if (count($notRegisteredQueues)) {
531            throw new \LogicException(
532                'Found not registered queue(s) linked to task(s): "'
533                    . implode('", "', $notRegisteredQueues) . '". Please check the values of "'
534                    . self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS . '" in your queue dispatcher settings.'
535            );
536        }
537    }
538
539    /**
540     * Get resource from rdf storage which represents task in the task queue by linked resource
541     * Returns null if there is no task linked to given resource
542     *
543     * It will be deprecated once we have the general GUI for displaying different info of a task for the user.
544     *
545     * @deprecated
546     *
547     * @param \core_kernel_classes_Resource $resource
548     * @return null|\core_kernel_classes_Resource
549     */
550    public function getTaskResource(\core_kernel_classes_Resource $resource)
551    {
552        $tasksRootClass = $this->getClass(Task::TASK_CLASS);
553        $taskResources = $tasksRootClass->searchInstances([Task::PROPERTY_LINKED_RESOURCE => $resource->getUri()]);
554
555        return empty($taskResources) ? null : current($taskResources);
556    }
557
558    /**
559     * It will be deprecated once we have the general GUI for displaying different info of a task for the user.
560     *
561     * @deprecated
562     *
563     * @param \core_kernel_classes_Resource $resource
564     * @return Report
565     */
566    public function getReportByLinkedResource(\core_kernel_classes_Resource $resource)
567    {
568        $taskResource = $this->getTaskResource($resource);
569
570        if ($taskResource !== null) {
571            $report = $taskResource->getOnePropertyValue($this->getProperty(Task::PROPERTY_REPORT));
572
573            if ($report) {
574                $report = Report::jsonUnserialize($report->literal);
575            } else {
576                $status = $this->getTaskLog()->getStatus($taskResource->getUri());
577                $msg = __('Task is in \'%s\' state', $status);
578
579                $report = $status == TaskLogInterface::STATUS_FAILED
580                    ? Report::createFailure($msg)
581                    : Report::createInfo($msg);
582            }
583        } else {
584            $report = Report::createFailure(__('Resource is not the task placeholder'));
585        }
586
587        return $report;
588    }
589
590    /**
591     * Create task resource in the rdf storage and link placeholder resource to it.
592     *
593     * It will be deprecated once we have the general GUI for displaying different info of a task for the user.
594     *
595     * @deprecated
596     *
597     * @param TaskInterface                      $task
598     * @param \core_kernel_classes_Resource|null $resource - placeholder resource to be linked with task.
599     * @return \core_kernel_classes_Resource
600     */
601    public function linkTaskToResource(TaskInterface $task, \core_kernel_classes_Resource $resource = null)
602    {
603        $taskResource = $this->getResource($task->getId());
604
605        if (!$taskResource->exists()) {
606            $tasksRootClass = $this->getClass(Task::TASK_CLASS);
607            $taskResource = $tasksRootClass->createInstance('', '', $task->getId());
608        }
609
610        if ($resource !== null) {
611            $taskResource->setPropertyValue($this->getProperty(Task::PROPERTY_LINKED_RESOURCE), $resource->getUri());
612        }
613
614        if ($this->isSync()) {
615            $report = $this->getTaskLog()->getReport($task->getId());
616
617            if (!empty($report)) {
618                // Serialize only two first report levels because sometimes serialized report is huge and it does not
619                // fit into `k_po` index of statements table.
620                $serializableReport = new Report($report->getType(), $report->getMessage(), $report->getData());
621
622                foreach ($report as $subReport) {
623                    $serializableSubReport = new Report(
624                        $subReport->getType(),
625                        $subReport->getMessage(),
626                        $subReport->getData()
627                    );
628                    $serializableReport->add($serializableSubReport);
629                }
630
631                $taskResource->setPropertyValue(
632                    $this->getProperty(Task::PROPERTY_REPORT),
633                    json_encode($serializableReport)
634                );
635            }
636        }
637
638        return $taskResource;
639    }
640}