Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
37.31% covered (danger)
37.31%
25 / 67
42.86% covered (danger)
42.86%
3 / 7
CRAP
0.00% covered (danger)
0.00%
0 / 1
QueueAssociationService
37.31% covered (danger)
37.31%
25 / 67
42.86% covered (danger)
42.86%
3 / 7
47.47
0.00% covered (danger)
0.00%
0 / 1
 associate
100.00% covered (success)
100.00%
19 / 19
100.00% covered (success)
100.00%
1 / 1
1
 getTargetClass
100.00% covered (success)
100.00%
5 / 5
100.00% covered (success)
100.00%
1 / 1
3
 getQueueDispatcher
100.00% covered (success)
100.00%
1 / 1
100.00% covered (success)
100.00%
1 / 1
1
 associateBulk
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
6
 guessDefaultBrokerType
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 deleteAndRemoveAssociations
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
12
 getBrokerFactory
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2020 (original work) Open Assessment Technologies SA;
19 */
20
21declare(strict_types=1);
22
23namespace oat\taoTaskQueue\model\Service;
24
25use common_exception_Error;
26use InvalidArgumentException;
27use oat\oatbox\action\Action;
28use oat\oatbox\service\ConfigurableService;
29use oat\tao\model\taskQueue\Queue;
30use oat\tao\model\taskQueue\QueueDispatcher;
31use oat\tao\model\taskQueue\QueueDispatcherInterface;
32use oat\taoTaskQueue\model\QueueBroker\RdsQueueBroker;
33use oat\taoTaskQueue\scripts\tools\BrokerFactory;
34use oat\taoTaskQueue\scripts\tools\InitializeQueue;
35
36class QueueAssociationService extends ConfigurableService
37{
38    public function associate(string $taskClass, string $queue): InitializeQueue
39    {
40        $targetClass = $this->getTargetClass($taskClass);
41
42        $existingQueues = $this->getQueueDispatcher()->getOption(QueueDispatcherInterface::OPTION_QUEUES);
43        $newQueue = new Queue($queue, new RdsQueueBroker('default', 1), 30);
44        $existingOptions = $this->getQueueDispatcher()->getOptions();
45        $existingOptions[QueueDispatcherInterface::OPTION_QUEUES] = array_unique(
46            array_merge($existingQueues, [$newQueue])
47        );
48        $existingAssociations = $this->getQueueDispatcher()->getOption(
49            QueueDispatcherInterface::OPTION_TASK_TO_QUEUE_ASSOCIATIONS
50        );
51        $existingOptions[QueueDispatcherInterface::OPTION_TASK_TO_QUEUE_ASSOCIATIONS] = array_merge(
52            $existingAssociations,
53            [$targetClass => $queue]
54        );
55        $this->getQueueDispatcher()->setOptions($existingOptions);
56
57        $this->getServiceManager()->register(QueueDispatcherInterface::SERVICE_ID, $this->getQueueDispatcher());
58
59        $initializer = new InitializeQueue();
60        $this->propagate($initializer);
61        return $initializer;
62    }
63
64    private function getTargetClass(string $taskClass): string
65    {
66        if (class_exists($taskClass) && is_a($taskClass, Action::class, true)) {
67            return $taskClass;
68        }
69        throw new InvalidArgumentException(
70            sprintf('%s - Task must extend %s', $taskClass, Action::class)
71        );
72    }
73
74    private function getQueueDispatcher(): QueueDispatcher
75    {
76        return $this->getServiceLocator()->get(QueueDispatcher::SERVICE_ID);
77    }
78
79    /**
80     * @throws common_exception_Error
81     */
82    public function associateBulk(
83        string $newQueueName,
84        array $newAssociations
85    ): ?Queue {
86        $factory = $this->getBrokerFactory();
87        $queueService = $this->getQueueDispatcher();
88        $existingOptions = $queueService->getOptions();
89        $existingQueues = $queueService->getOption(QueueDispatcherInterface::OPTION_QUEUES);
90        $newQueue = null;
91
92        if (!in_array($newQueueName, $queueService->getQueueNames())) {
93            $broker = $factory->create($this->guessDefaultBrokerType(), 'default', 2);
94            $newQueue = new Queue($newQueueName, $broker, 30);
95            $this->propagate($broker);
96
97            $existingOptions[QueueDispatcherInterface::OPTION_QUEUES] = array_merge($existingQueues, [$newQueue]);
98        }
99
100        $existingAssociations = $existingOptions[QueueDispatcherInterface::OPTION_TASK_TO_QUEUE_ASSOCIATIONS];
101
102        $existingOptions[QueueDispatcherInterface::OPTION_TASK_TO_QUEUE_ASSOCIATIONS] = array_merge(
103            $existingAssociations,
104            $newAssociations
105        );
106
107        $queueService->setOptions($existingOptions);
108        $this->getServiceManager()->register(QueueDispatcherInterface::SERVICE_ID, $queueService);
109
110        return $newQueue;
111    }
112
113    public function guessDefaultBrokerType(): string
114    {
115        $queueService = $this->getQueueDispatcher();
116
117        $existingQueues = $queueService->getOption(QueueDispatcherInterface::OPTION_QUEUES);
118        /** @var Queue $queue */
119        $queue = $existingQueues[0];
120
121        $this->propagate($queue);
122
123        return $queue->getBroker()->getBrokerId();
124    }
125
126    public function deleteAndRemoveAssociations(string $queueNameForRemoval): void
127    {
128        /** @var QueueDispatcher $queueService */
129        $queueService = $this->getServiceManager()->get(QueueDispatcher::SERVICE_ID);
130        $existingQueues = $queueService->getOption(QueueDispatcherInterface::OPTION_QUEUES);
131
132        $newQueue = [];
133        /** @var Queue $queue */
134        foreach ($existingQueues as $queue) {
135            if ($queue->getName() !== $queueNameForRemoval) {
136                $newQueue[] = $queue;
137            }
138        }
139
140        $existingOptions = $queueService->getOptions();
141        $existingOptions[QueueDispatcherInterface::OPTION_QUEUES] = $newQueue;
142
143        $existingAssociations = $existingOptions[QueueDispatcherInterface::OPTION_TASK_TO_QUEUE_ASSOCIATIONS];
144        $newAssociations = array_filter(
145            $existingAssociations,
146            function ($queueName) use ($queueNameForRemoval) {
147                return $queueNameForRemoval !== $queueName;
148            }
149        );
150        $existingOptions[QueueDispatcherInterface::OPTION_TASK_TO_QUEUE_ASSOCIATIONS] = $newAssociations;
151
152        $queueService->setOptions($existingOptions);
153        $this->getServiceManager()->register(QueueDispatcherInterface::SERVICE_ID, $queueService);
154    }
155
156    private function getBrokerFactory(): BrokerFactory
157    {
158        return $this->getServiceManager()->get(BrokerFactory::class);
159    }
160}