Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 137 |
|
0.00% |
0 / 29 |
CRAP | |
0.00% |
0 / 1 |
QueueDispatcher | |
0.00% |
0 / 137 |
|
0.00% |
0 / 29 |
4422 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
42 | |||
getQueueForTask | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
42 | |||
getQueueNames | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
setQueues | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 | |||
addQueue | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
removeQueue | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
12 | |||
hasQueue | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getQueue | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
getQueues | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
6 | |||
linkTaskToQueue | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
12 | |||
getLinkedTasks | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getDefaultQueue | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
12 | |||
getFirstQueue | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
setTaskSelector | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
initialize | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
6 | |||
createTask | |
0.00% |
0 / 12 |
|
0.00% |
0 / 1 |
20 | |||
setOwner | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getOwner | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
enqueue | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
dequeue | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
acknowledge | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
count | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
isSync | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
getWaitTime | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
hasPreFetchedMessages | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getTaskLog | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
propagateServices | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
assertQueues | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
30 | |||
assertTasks | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
12 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | namespace oat\tao\model\taskQueue; |
23 | |
24 | use oat\generis\model\OntologyAwareTrait; |
25 | use oat\oatbox\service\ConfigurableService; |
26 | use oat\oatbox\log\LoggerAwareTrait; |
27 | use oat\tao\model\taskQueue\Queue\TaskSelector\SelectorStrategyInterface; |
28 | use oat\tao\model\taskQueue\Queue\TaskSelector\WeightStrategy; |
29 | use oat\tao\model\taskQueue\Task\CallbackTask; |
30 | use oat\tao\model\taskQueue\Task\CallbackTaskInterface; |
31 | use oat\tao\model\taskQueue\Task\QueueAssociableInterface; |
32 | use oat\tao\model\taskQueue\Task\TaskInterface; |
33 | use oat\tao\model\taskQueue\TaskLog\TaskLogAwareInterface; |
34 | use oat\tao\model\taskQueue\Worker\OneTimeWorker; |
35 | |
36 | /** |
37 | * @author Gyula Szucs <gyula@taotesting.com> |
38 | */ |
39 | class QueueDispatcher extends ConfigurableService implements QueueDispatcherInterface |
40 | { |
41 | use LoggerAwareTrait; |
42 | use OntologyAwareTrait; |
43 | |
44 | /** |
45 | * @var TaskLogInterface |
46 | */ |
47 | private $taskLog; |
48 | |
49 | /** @var string */ |
50 | private $owner; |
51 | |
52 | /** @var SelectorStrategyInterface */ |
53 | private $selectorStrategy; |
54 | |
55 | private $propagated = false; |
56 | |
57 | /** |
58 | * QueueDispatcher constructor. |
59 | * |
60 | * @param array $options |
61 | * @throws \common_exception_Error |
62 | */ |
63 | public function __construct(array $options) |
64 | { |
65 | parent::__construct($options); |
66 | |
67 | $this->assertQueues(); |
68 | |
69 | $this->assertTasks(); |
70 | |
71 | if ( |
72 | !$this->hasOption(self::OPTION_TASK_SELECTOR_STRATEGY) |
73 | || empty($this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY)) |
74 | ) { |
75 | // setting default strategy |
76 | $this->selectorStrategy = new WeightStrategy(); |
77 | } else { |
78 | // using the strategy set in the options |
79 | if (!is_a($this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY), SelectorStrategyInterface::class)) { |
80 | throw new \common_exception_Error('Task selector must implement ' . SelectorStrategyInterface::class); |
81 | } |
82 | |
83 | $this->selectorStrategy = $this->getOption(self::OPTION_TASK_SELECTOR_STRATEGY); |
84 | } |
85 | |
86 | if (!$this->hasOption(self::OPTION_TASK_LOG) || empty($this->getOption(self::OPTION_TASK_LOG))) { |
87 | throw new \common_exception_Error('Task Log service needs to be set.'); |
88 | } |
89 | } |
90 | |
91 | /** |
92 | * @param TaskInterface $task |
93 | * @return QueueInterface |
94 | */ |
95 | protected function getQueueForTask(TaskInterface $task) |
96 | { |
97 | $action = $task instanceof CallbackTaskInterface && is_object($task->getCallable()) |
98 | ? $task->getCallable() |
99 | : $task; |
100 | |
101 | // getting queue name using the implemented getter function |
102 | if ( |
103 | $action instanceof QueueAssociableInterface |
104 | && ($queueName = $action->getQueueName($task->getParameters())) |
105 | ) { |
106 | return $this->getQueue($queueName); |
107 | } |
108 | |
109 | // getting the queue name based on the linked tasks configuration |
110 | $className = get_class($action); |
111 | if (array_key_exists($className, $this->getLinkedTasks())) { |
112 | $queueName = $this->getLinkedTasks()[$className]; |
113 | |
114 | return $this->getQueue($queueName); |
115 | } |
116 | |
117 | // if we still don't have a queue, let's use the default one |
118 | return $this->getDefaultQueue(); |
119 | } |
120 | |
121 | /** |
122 | * @inheritdoc |
123 | */ |
124 | public function getQueueNames() |
125 | { |
126 | return array_map(function (QueueInterface $queue) { |
127 | return $queue->getName(); |
128 | }, $this->getOption(self::OPTION_QUEUES)); |
129 | } |
130 | |
131 | /** |
132 | * @inheritdoc |
133 | */ |
134 | public function setQueues(array $queues) |
135 | { |
136 | $this->propagated = false; |
137 | |
138 | $this->setOption(self::OPTION_QUEUES, $queues); |
139 | |
140 | return $this; |
141 | } |
142 | |
143 | /** |
144 | * @inheritdoc |
145 | * @throws \LogicException |
146 | */ |
147 | public function addQueue(QueueInterface $queue) |
148 | { |
149 | if ($this->hasQueue($queue->getName())) { |
150 | throw new \LogicException('Queue "' . $queue . '" is already registered.'); |
151 | } |
152 | |
153 | $this->propagated = false; |
154 | |
155 | $queues = $this->getQueues(); |
156 | $queues[] = $queue; |
157 | |
158 | $this->setOption(self::OPTION_QUEUES, $queues); |
159 | |
160 | return $this; |
161 | } |
162 | |
163 | public function removeQueue(string $queueName): self |
164 | { |
165 | $queues = $this->getQueues(); |
166 | |
167 | foreach ($queues as $key => $queue) { |
168 | if ($queue->getName() === $queueName) { |
169 | unset($queues[$key]); |
170 | } |
171 | } |
172 | |
173 | $this->setQueues(array_values($queues)); |
174 | |
175 | return $this; |
176 | } |
177 | |
178 | /** |
179 | * @inheritdoc |
180 | */ |
181 | public function hasQueue($queueName) |
182 | { |
183 | return in_array($queueName, $this->getQueueNames()); |
184 | } |
185 | |
186 | /** |
187 | * @inheritdoc |
188 | */ |
189 | public function getQueue($queueName) |
190 | { |
191 | $foundQueue = array_filter($this->getQueues(), function (QueueInterface $queue) use ($queueName) { |
192 | return $queue->getName() === $queueName; |
193 | }); |
194 | |
195 | if (count($foundQueue) === 1) { |
196 | return reset($foundQueue); |
197 | } |
198 | |
199 | throw new \InvalidArgumentException('Queue "' . $queueName . '" does not exist.'); |
200 | } |
201 | |
202 | /** |
203 | * @return QueueInterface[] |
204 | */ |
205 | public function getQueues() |
206 | { |
207 | if (!$this->propagated) { |
208 | $queues = (array) $this->getOption(self::OPTION_QUEUES); |
209 | |
210 | // propagate the services for the queues first |
211 | array_walk($queues, function (QueueInterface $queue) { |
212 | $this->propagateServices($queue); |
213 | }); |
214 | |
215 | $this->propagated = true; |
216 | } |
217 | |
218 | return $this->getOption(self::OPTION_QUEUES); |
219 | } |
220 | |
221 | /** |
222 | * @inheritdoc |
223 | */ |
224 | public function linkTaskToQueue($taskName, $queueName) |
225 | { |
226 | if (is_object($taskName)) { |
227 | $taskName = get_class($taskName); |
228 | } |
229 | |
230 | if (!$this->hasQueue($queueName)) { |
231 | throw new \LogicException( |
232 | 'Task "' . $taskName . '" cannot be added to "' . $queueName . '". Queue is not registered.' |
233 | ); |
234 | } |
235 | |
236 | $tasks = $this->getLinkedTasks(); |
237 | |
238 | $tasks[ (string) $taskName ] = $queueName; |
239 | |
240 | $this->setOption(self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS, $tasks); |
241 | |
242 | return $this; |
243 | } |
244 | |
245 | /** |
246 | * @inheritdoc |
247 | */ |
248 | public function getLinkedTasks() |
249 | { |
250 | return (array) $this->getOption(self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS); |
251 | } |
252 | |
253 | /** |
254 | * Return the first queue as a default one. |
255 | * Maybe, later we need other logic the determine the default queue. |
256 | * |
257 | * @return QueueInterface |
258 | */ |
259 | public function getDefaultQueue() |
260 | { |
261 | return $this->hasOption(self::OPTION_DEFAULT_QUEUE) && $this->getOption(self::OPTION_DEFAULT_QUEUE) |
262 | ? $this->getQueue($this->getOption(self::OPTION_DEFAULT_QUEUE)) |
263 | : $this->getFirstQueue(); |
264 | } |
265 | |
266 | /** |
267 | * Return the first queue from the array. |
268 | * |
269 | * @return QueueInterface |
270 | */ |
271 | protected function getFirstQueue() |
272 | { |
273 | $queues = $this->getQueues(); |
274 | |
275 | return reset($queues); |
276 | } |
277 | |
278 | /** |
279 | * @inheritdoc |
280 | */ |
281 | public function setTaskSelector(SelectorStrategyInterface $selectorStrategy) |
282 | { |
283 | $this->setOption(self::OPTION_TASK_SELECTOR_STRATEGY, $selectorStrategy); |
284 | |
285 | return $this; |
286 | } |
287 | |
288 | /** |
289 | * Initialize queue. |
290 | * |
291 | * @return void |
292 | */ |
293 | public function initialize() |
294 | { |
295 | foreach ($this->getQueues() as $queue) { |
296 | $queue->initialize(); |
297 | } |
298 | } |
299 | |
300 | /** |
301 | * @inheritdoc |
302 | */ |
303 | public function createTask( |
304 | callable $callable, |
305 | array $parameters = [], |
306 | $label = null, |
307 | TaskInterface $parent = null, |
308 | $masterStatus = false |
309 | ) { |
310 | $id = \common_Utils::getNewUri(); |
311 | $owner = $parent ? $parent->getOwner() : $this->getOwner(); |
312 | |
313 | $callbackTask = new CallbackTask($id, $owner); |
314 | $callbackTask->setCallable($callable) |
315 | ->setParameter($parameters); |
316 | |
317 | if ($parent) { |
318 | $callbackTask->setParentId($parent->getId()); |
319 | } |
320 | |
321 | $callbackTask->setMasterStatus($masterStatus); |
322 | |
323 | if ($this->enqueue($callbackTask, $label)) { |
324 | $callbackTask->markAsEnqueued(); |
325 | } |
326 | |
327 | return $callbackTask; |
328 | } |
329 | |
330 | /** |
331 | * @param string $owner |
332 | */ |
333 | public function setOwner($owner) |
334 | { |
335 | $this->owner = $owner; |
336 | } |
337 | |
338 | /** |
339 | * @return string |
340 | * @throws \common_exception_Error |
341 | */ |
342 | public function getOwner() |
343 | { |
344 | if (is_null($this->owner)) { |
345 | return \common_session_SessionManager::getSession()->getUser()->getIdentifier(); |
346 | } |
347 | |
348 | return $this->owner; |
349 | } |
350 | |
351 | /** |
352 | * @param TaskInterface $task |
353 | * @param null|string $label |
354 | * @return bool |
355 | */ |
356 | public function enqueue(TaskInterface $task, $label = null) |
357 | { |
358 | $queue = $this->getQueueForTask($task); |
359 | $isEnqueued = $queue->enqueue($task, $label); |
360 | |
361 | // if we need to run the task straightaway, then run a worker on-the-fly for one round. |
362 | if ($isEnqueued && $queue->isSync()) { |
363 | $oneTimeWorker = new OneTimeWorker($queue, $this->getTaskLog()); |
364 | $this->propagate($oneTimeWorker); |
365 | $oneTimeWorker->run(); |
366 | } |
367 | |
368 | return $isEnqueued; |
369 | } |
370 | |
371 | /** |
372 | * Receive a task from a specified queue or from a queue selected by a predefined strategy |
373 | * |
374 | * @inheritdoc |
375 | */ |
376 | public function dequeue() |
377 | { |
378 | // if there is only one queue defined, let's use that |
379 | if (count($this->getQueues()) === 1) { |
380 | return $this->getFirstQueue()->dequeue(); |
381 | } |
382 | |
383 | // default: getting a task using the current task selector strategy |
384 | return $this->selectorStrategy->pickNextTask($this->getQueues()); |
385 | } |
386 | |
387 | /** |
388 | * @inheritdoc |
389 | */ |
390 | public function acknowledge(TaskInterface $task) |
391 | { |
392 | $this->getQueueForTask($task)->acknowledge($task); |
393 | } |
394 | |
395 | /** |
396 | * Count of messages in all queues. |
397 | * |
398 | * @return int |
399 | */ |
400 | public function count(): int |
401 | { |
402 | $counts = array_map(function (QueueInterface $queue) { |
403 | return $queue->count(); |
404 | }, $this->getQueues()); |
405 | |
406 | return array_sum($counts); |
407 | } |
408 | |
409 | /** |
410 | * @inheritdoc |
411 | */ |
412 | public function isSync() |
413 | { |
414 | foreach ($this->getQueues() as $queue) { |
415 | if (!$queue->isSync()) { |
416 | return false; |
417 | } |
418 | } |
419 | |
420 | return true; |
421 | } |
422 | |
423 | public function getWaitTime() |
424 | { |
425 | return $this->selectorStrategy->getWaitTime(); |
426 | } |
427 | |
428 | /** |
429 | * @inheritdoc |
430 | */ |
431 | public function hasPreFetchedMessages(): bool |
432 | { |
433 | return true; |
434 | } |
435 | |
436 | /** |
437 | * @return TaskLogInterface |
438 | */ |
439 | protected function getTaskLog() |
440 | { |
441 | if (is_null($this->taskLog)) { |
442 | $this->taskLog = $this->getServiceLocator()->get($this->getOption(self::OPTION_TASK_LOG)); |
443 | } |
444 | |
445 | return $this->taskLog; |
446 | } |
447 | |
448 | /** |
449 | * @param QueueInterface $queue |
450 | * @return QueueInterface |
451 | */ |
452 | protected function propagateServices(QueueInterface $queue) |
453 | { |
454 | $this->propagate($queue); |
455 | |
456 | if ($queue instanceof TaskLogAwareInterface) { |
457 | $queue->setTaskLog($this->getTaskLog()); |
458 | } |
459 | |
460 | return $queue; |
461 | } |
462 | |
463 | /** |
464 | * @throws \InvalidArgumentException |
465 | */ |
466 | private function assertQueues() |
467 | { |
468 | if (!$this->hasOption(self::OPTION_QUEUES) || empty($this->getOption(self::OPTION_QUEUES))) { |
469 | throw new \InvalidArgumentException("Queues needs to be set."); |
470 | } |
471 | |
472 | if (count($this->getOption(self::OPTION_QUEUES)) === 1) { |
473 | return; |
474 | } |
475 | |
476 | if ( |
477 | count($this->getOption(self::OPTION_QUEUES)) != count(array_unique($this->getOption(self::OPTION_QUEUES))) |
478 | ) { |
479 | throw new \InvalidArgumentException( |
480 | 'There are duplicated Queue names. Please check the values of "' . self::OPTION_QUEUES |
481 | . '" in your queue dispatcher settings.' |
482 | ); |
483 | } |
484 | } |
485 | |
486 | /** |
487 | * @throws \InvalidArgumentException |
488 | */ |
489 | private function assertTasks() |
490 | { |
491 | if (empty($this->getLinkedTasks())) { |
492 | return; |
493 | } |
494 | |
495 | // check if every task is linked to a registered queue |
496 | $notRegisteredQueues = array_diff(array_values($this->getLinkedTasks()), $this->getQueueNames()); |
497 | |
498 | if (count($notRegisteredQueues)) { |
499 | throw new \LogicException( |
500 | 'Found not registered queue(s) linked to task(s): "' . implode('", "', $notRegisteredQueues) |
501 | . '". Please check the values of "' . self::OPTION_TASK_TO_QUEUE_ASSOCIATIONS |
502 | . '" in your queue dispatcher settings.' |
503 | ); |
504 | } |
505 | } |
506 | } |