Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 40 |
|
0.00% |
0 / 17 |
CRAP | |
0.00% |
0 / 1 |
AbstractQueueBroker | |
0.00% |
0 / 40 |
|
0.00% |
0 / 17 |
600 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
__toPhpCode | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doPop | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
doDelete | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
0 | |||
pop | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
hasPreFetchedMessages | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
popPreFetchedMessage | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
pushPreFetchedMessage | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
unserializeTask | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
6 | |||
serializeTask | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
assertValidJson | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
20 | |||
handleCallbackTask | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
getActionResolver | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setQueueName | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getQueueName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getQueueNameWithPrefix | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getNumberOfTasksToReceive | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getBrokerId | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2017-2021 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | namespace oat\tao\model\taskQueue\Queue\Broker; |
23 | |
24 | use oat\oatbox\action\ActionService; |
25 | use oat\oatbox\action\ResolutionException; |
26 | use oat\oatbox\log\LoggerAwareTrait; |
27 | use oat\oatbox\PhpSerializable; |
28 | use oat\oatbox\service\ConfigurableService; |
29 | use oat\tao\model\taskQueue\QueueDispatcher; |
30 | use oat\tao\model\taskQueue\Task\CallbackTaskInterface; |
31 | use oat\tao\model\taskQueue\Task\TaskInterface; |
32 | use oat\tao\model\taskQueue\Task\TaskSerializerService; |
33 | use Zend\ServiceManager\ServiceLocatorAwareInterface; |
34 | use Zend\ServiceManager\ServiceLocatorAwareTrait; |
35 | |
36 | /** |
37 | * @author Gyula Szucs <gyula@taotesting.com> |
38 | */ |
39 | abstract class AbstractQueueBroker implements QueueBrokerInterface, PhpSerializable, ServiceLocatorAwareInterface |
40 | { |
41 | use LoggerAwareTrait; |
42 | use ServiceLocatorAwareTrait; |
43 | |
44 | private $numberOfTasksToReceive; |
45 | private $queueName; |
46 | private $preFetchedQueue; |
47 | |
48 | /** |
49 | * @param int $receiveTasks Maximum amount of tasks that can be received when polling the queue; Default is 1. |
50 | */ |
51 | public function __construct($receiveTasks = 1) |
52 | { |
53 | $this->numberOfTasksToReceive = $receiveTasks; |
54 | $this->preFetchedQueue = new \SplQueue(); |
55 | } |
56 | |
57 | public function __toPhpCode() |
58 | { |
59 | return 'new ' . get_called_class() . '(' |
60 | . \common_Utils::toHumanReadablePhpString($this->numberOfTasksToReceive) . ')'; |
61 | } |
62 | |
63 | /** |
64 | * Do the specific pop mechanism related to the given broker. |
65 | * Tasks need to be added to the internal pre-fetched queue. |
66 | * |
67 | * @return void |
68 | */ |
69 | abstract protected function doPop(); |
70 | |
71 | /** |
72 | * Internal mechanism of deleting a message, specific for the given broker |
73 | * |
74 | * @param string $id |
75 | * @param array $logContext |
76 | * @return void |
77 | */ |
78 | abstract protected function doDelete($id, array $logContext = []); |
79 | |
80 | /** |
81 | * @return null|TaskInterface |
82 | */ |
83 | public function pop() |
84 | { |
85 | // if there is item in the pre-fetched queue, let's return that |
86 | if ($message = $this->popPreFetchedMessage()) { |
87 | return $message; |
88 | } |
89 | |
90 | $this->doPop(); |
91 | |
92 | return $this->popPreFetchedMessage(); |
93 | } |
94 | |
95 | /** |
96 | * @inheritdoc |
97 | */ |
98 | public function hasPreFetchedMessages(): bool |
99 | { |
100 | return $this->preFetchedQueue->count(); |
101 | } |
102 | |
103 | /** |
104 | * Pop a task from the internal queue. |
105 | * |
106 | * @return TaskInterface|null |
107 | */ |
108 | private function popPreFetchedMessage() |
109 | { |
110 | if ($this->hasPreFetchedMessages()) { |
111 | return $this->preFetchedQueue->dequeue(); |
112 | } |
113 | |
114 | return null; |
115 | } |
116 | |
117 | /** |
118 | * Add a task to the internal queue. |
119 | * |
120 | * @param TaskInterface $task |
121 | */ |
122 | protected function pushPreFetchedMessage(TaskInterface $task) |
123 | { |
124 | $this->preFetchedQueue->enqueue($task); |
125 | } |
126 | |
127 | /** |
128 | * Unserialize the given task JSON. |
129 | * |
130 | * If the json is not valid, it deletes the task straight away without processing it. |
131 | * |
132 | * @param string $taskJSON |
133 | * @param string $idForDeletion An identification of the given task |
134 | * @param array $logContext |
135 | * @return null|TaskInterface |
136 | */ |
137 | protected function unserializeTask($taskJSON, $idForDeletion, array $logContext = []) |
138 | { |
139 | /** @var TaskSerializerService $taskSerializer */ |
140 | $taskSerializer = $this->getServiceLocator()->get(TaskSerializerService::SERVICE_ID); |
141 | |
142 | try { |
143 | return $taskSerializer->deserialize($taskJSON); |
144 | } catch (\Exception $e) { |
145 | $this->doDelete($idForDeletion, $logContext); |
146 | |
147 | return null; |
148 | } |
149 | } |
150 | |
151 | /** |
152 | * @param TaskInterface $task |
153 | * @return mixed |
154 | */ |
155 | protected function serializeTask(TaskInterface $task) |
156 | { |
157 | /** @var TaskSerializerService $taskSerializer */ |
158 | $taskSerializer = $this->getServiceLocator()->get(TaskSerializerService::SERVICE_ID); |
159 | |
160 | return $taskSerializer->serialize($task); |
161 | } |
162 | |
163 | /** |
164 | * @param $basicData |
165 | * @throws \Exception |
166 | */ |
167 | protected function assertValidJson($basicData) |
168 | { |
169 | if ( |
170 | ($basicData !== null |
171 | && json_last_error() === JSON_ERROR_NONE |
172 | && isset($basicData[TaskInterface::JSON_TASK_CLASS_NAME_KEY])) === false |
173 | ) { |
174 | throw new \Exception(); |
175 | } |
176 | } |
177 | |
178 | /** |
179 | * @param CallbackTaskInterface $task |
180 | * @param array $logContext |
181 | * @throws \Exception |
182 | */ |
183 | protected function handleCallbackTask(CallbackTaskInterface $task, array $logContext) |
184 | { |
185 | try { |
186 | $callable = $this->getActionResolver()->resolve($task->getCallable()); |
187 | |
188 | if ($callable instanceof ServiceLocatorAwareInterface) { |
189 | $callable->setServiceLocator($this->getServiceLocator()); |
190 | } |
191 | |
192 | $task->setCallable($callable); |
193 | } catch (ResolutionException $e) { |
194 | $this->logError('Callable/Action class ' . $task->getCallable() . ' does not exist', $logContext); |
195 | |
196 | throw new \Exception(); |
197 | } |
198 | } |
199 | |
200 | /** |
201 | * @return ActionService|ConfigurableService|object |
202 | */ |
203 | protected function getActionResolver() |
204 | { |
205 | return $this->getServiceLocator()->get(ActionService::SERVICE_ID); |
206 | } |
207 | |
208 | /** |
209 | * @param string $name |
210 | * @return $this |
211 | */ |
212 | public function setQueueName($name) |
213 | { |
214 | $this->queueName = $name; |
215 | |
216 | return $this; |
217 | } |
218 | |
219 | /** |
220 | * @return string |
221 | */ |
222 | protected function getQueueName() |
223 | { |
224 | return $this->queueName; |
225 | } |
226 | |
227 | /** |
228 | * @return string |
229 | */ |
230 | protected function getQueueNameWithPrefix() |
231 | { |
232 | return sprintf("%s_%s", QueueDispatcher::QUEUE_PREFIX, $this->getQueueName()); |
233 | } |
234 | |
235 | /** |
236 | * @inheritdoc |
237 | */ |
238 | public function getNumberOfTasksToReceive() |
239 | { |
240 | return abs((int) $this->numberOfTasksToReceive); |
241 | } |
242 | |
243 | public function getBrokerId(): string |
244 | { |
245 | return static::ID; |
246 | } |
247 | } |