Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 40
0.00% covered (danger)
0.00%
0 / 17
CRAP
0.00% covered (danger)
0.00%
0 / 1
AbstractQueueBroker
0.00% covered (danger)
0.00%
0 / 40
0.00% covered (danger)
0.00%
0 / 17
600
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 doPop
n/a
0 / 0
n/a
0 / 0
0
 doDelete
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
0
 pop
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 hasPreFetchedMessages
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 popPreFetchedMessage
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 pushPreFetchedMessage
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 unserializeTask
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
6
 serializeTask
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 assertValidJson
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
20
 handleCallbackTask
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 getActionResolver
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setQueueName
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getQueueName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getQueueNameWithPrefix
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfTasksToReceive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getBrokerId
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017-2021 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\tao\model\taskQueue\Queue\Broker;
23
24use oat\oatbox\action\ActionService;
25use oat\oatbox\action\ResolutionException;
26use oat\oatbox\log\LoggerAwareTrait;
27use oat\oatbox\PhpSerializable;
28use oat\oatbox\service\ConfigurableService;
29use oat\tao\model\taskQueue\QueueDispatcher;
30use oat\tao\model\taskQueue\Task\CallbackTaskInterface;
31use oat\tao\model\taskQueue\Task\TaskInterface;
32use oat\tao\model\taskQueue\Task\TaskSerializerService;
33use Zend\ServiceManager\ServiceLocatorAwareInterface;
34use Zend\ServiceManager\ServiceLocatorAwareTrait;
35
36/**
37 * @author Gyula Szucs <gyula@taotesting.com>
38 */
39abstract class AbstractQueueBroker implements QueueBrokerInterface, PhpSerializable, ServiceLocatorAwareInterface
40{
41    use LoggerAwareTrait;
42    use ServiceLocatorAwareTrait;
43
44    private $numberOfTasksToReceive;
45    private $queueName;
46    private $preFetchedQueue;
47
48    /**
49     * @param int $receiveTasks Maximum amount of tasks that can be received when polling the queue; Default is 1.
50     */
51    public function __construct($receiveTasks = 1)
52    {
53        $this->numberOfTasksToReceive = $receiveTasks;
54        $this->preFetchedQueue = new \SplQueue();
55    }
56
57    public function __toPhpCode()
58    {
59        return 'new ' . get_called_class() . '('
60            . \common_Utils::toHumanReadablePhpString($this->numberOfTasksToReceive) . ')';
61    }
62
63    /**
64     * Do the specific pop mechanism related to the given broker.
65     * Tasks need to be added to the internal pre-fetched queue.
66     *
67     * @return void
68     */
69    abstract protected function doPop();
70
71    /**
72     * Internal mechanism of deleting a message, specific for the given broker
73     *
74     * @param string $id
75     * @param array $logContext
76     * @return void
77     */
78    abstract protected function doDelete($id, array $logContext = []);
79
80    /**
81     * @return null|TaskInterface
82     */
83    public function pop()
84    {
85        // if there is item in the pre-fetched queue, let's return that
86        if ($message = $this->popPreFetchedMessage()) {
87            return $message;
88        }
89
90        $this->doPop();
91
92        return $this->popPreFetchedMessage();
93    }
94
95    /**
96     * @inheritdoc
97     */
98    public function hasPreFetchedMessages(): bool
99    {
100        return $this->preFetchedQueue->count();
101    }
102
103    /**
104     * Pop a task from the internal queue.
105     *
106     * @return TaskInterface|null
107     */
108    private function popPreFetchedMessage()
109    {
110        if ($this->hasPreFetchedMessages()) {
111            return $this->preFetchedQueue->dequeue();
112        }
113
114        return null;
115    }
116
117    /**
118     * Add a task to the internal queue.
119     *
120     * @param TaskInterface $task
121     */
122    protected function pushPreFetchedMessage(TaskInterface $task)
123    {
124        $this->preFetchedQueue->enqueue($task);
125    }
126
127    /**
128     * Unserialize the given task JSON.
129     *
130     * If the json is not valid, it deletes the task straight away without processing it.
131     *
132     * @param string $taskJSON
133     * @param string $idForDeletion An identification of the given task
134     * @param array  $logContext
135     * @return null|TaskInterface
136     */
137    protected function unserializeTask($taskJSON, $idForDeletion, array $logContext = [])
138    {
139        /** @var TaskSerializerService $taskSerializer */
140        $taskSerializer = $this->getServiceLocator()->get(TaskSerializerService::SERVICE_ID);
141
142        try {
143            return $taskSerializer->deserialize($taskJSON);
144        } catch (\Exception $e) {
145            $this->doDelete($idForDeletion, $logContext);
146
147            return null;
148        }
149    }
150
151    /**
152     * @param TaskInterface $task
153     * @return mixed
154     */
155    protected function serializeTask(TaskInterface $task)
156    {
157        /** @var TaskSerializerService $taskSerializer */
158        $taskSerializer = $this->getServiceLocator()->get(TaskSerializerService::SERVICE_ID);
159
160        return $taskSerializer->serialize($task);
161    }
162
163    /**
164     * @param $basicData
165     * @throws \Exception
166     */
167    protected function assertValidJson($basicData)
168    {
169        if (
170            ($basicData !== null
171            && json_last_error() === JSON_ERROR_NONE
172            && isset($basicData[TaskInterface::JSON_TASK_CLASS_NAME_KEY])) === false
173        ) {
174            throw new \Exception();
175        }
176    }
177
178    /**
179     * @param CallbackTaskInterface $task
180     * @param array $logContext
181     * @throws \Exception
182     */
183    protected function handleCallbackTask(CallbackTaskInterface $task, array $logContext)
184    {
185        try {
186            $callable = $this->getActionResolver()->resolve($task->getCallable());
187
188            if ($callable instanceof ServiceLocatorAwareInterface) {
189                $callable->setServiceLocator($this->getServiceLocator());
190            }
191
192            $task->setCallable($callable);
193        } catch (ResolutionException $e) {
194            $this->logError('Callable/Action class ' . $task->getCallable() . ' does not exist', $logContext);
195
196            throw new \Exception();
197        }
198    }
199
200    /**
201     * @return ActionService|ConfigurableService|object
202     */
203    protected function getActionResolver()
204    {
205        return $this->getServiceLocator()->get(ActionService::SERVICE_ID);
206    }
207
208    /**
209     * @param string $name
210     * @return $this
211     */
212    public function setQueueName($name)
213    {
214        $this->queueName = $name;
215
216        return $this;
217    }
218
219    /**
220     * @return string
221     */
222    protected function getQueueName()
223    {
224        return $this->queueName;
225    }
226
227    /**
228     * @return string
229     */
230    protected function getQueueNameWithPrefix()
231    {
232        return sprintf("%s_%s", QueueDispatcher::QUEUE_PREFIX, $this->getQueueName());
233    }
234
235    /**
236     * @inheritdoc
237     */
238    public function getNumberOfTasksToReceive()
239    {
240        return abs((int) $this->numberOfTasksToReceive);
241    }
242
243    public function getBrokerId(): string
244    {
245        return static::ID;
246    }
247}