Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
0.00% |
0 / 41 |
|
0.00% |
0 / 15 |
CRAP | |
0.00% |
0 / 1 |
AbstractQueueBroker | |
0.00% |
0 / 41 |
|
0.00% |
0 / 15 |
600 | |
0.00% |
0 / 1 |
__construct | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
__toPhpCode | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
doPop | n/a |
0 / 0 |
n/a |
0 / 0 |
0 | |||||
doDelete | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
0 | |||
pop | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
6 | |||
popPreFetchedMessage | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
6 | |||
pushPreFetchedMessage | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
unserializeTask | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
20 | |||
serializeTask | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
assertValidJson | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
20 | |||
handleCallbackTask | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
12 | |||
getActionResolver | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
setQueueName | |
0.00% |
0 / 2 |
|
0.00% |
0 / 1 |
2 | |||
getQueueName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getQueueNameWithPrefix | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getNumberOfTasksToReceive | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | namespace oat\taoTaskQueue\model\QueueBroker; |
23 | |
24 | use oat\oatbox\PhpSerializable; |
25 | use oat\oatbox\service\ConfigurableService; |
26 | use oat\oatbox\action\ActionService; |
27 | use oat\oatbox\action\ResolutionException; |
28 | use oat\oatbox\log\LoggerAwareTrait; |
29 | use oat\taoTaskQueue\model\QueueDispatcher; |
30 | use oat\taoTaskQueue\model\Task\CallbackTaskInterface; |
31 | use oat\taoTaskQueue\model\Task\TaskFactory; |
32 | use oat\taoTaskQueue\model\Task\TaskInterface; |
33 | use Zend\ServiceManager\ServiceLocatorAwareInterface; |
34 | use Zend\ServiceManager\ServiceLocatorAwareTrait; |
35 | |
36 | /** |
37 | * Class AbstractQueueBroker |
38 | * |
39 | * @deprecated Use \oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker |
40 | * |
41 | * @author Gyula Szucs <gyula@taotesting.com> |
42 | */ |
43 | abstract class AbstractQueueBroker implements QueueBrokerInterface, PhpSerializable, ServiceLocatorAwareInterface |
44 | { |
45 | use LoggerAwareTrait; |
46 | use ServiceLocatorAwareTrait; |
47 | |
48 | private $numberOfTasksToReceive; |
49 | private $queueName; |
50 | private $preFetchedQueue; |
51 | |
52 | /** |
53 | * AbstractMessageBroker constructor. |
54 | * |
55 | * @param int $receiveTasks Maximum amount of tasks that can be received when polling the queue; Default is 1. |
56 | */ |
57 | public function __construct($receiveTasks = 1) |
58 | { |
59 | $this->numberOfTasksToReceive = $receiveTasks; |
60 | $this->preFetchedQueue = new \SplQueue(); |
61 | } |
62 | |
63 | public function __toPhpCode() |
64 | { |
65 | return 'new ' . get_called_class() |
66 | . '(' . \common_Utils::toHumanReadablePhpString($this->numberOfTasksToReceive) . ')'; |
67 | } |
68 | |
69 | /** |
70 | * Do the specific pop mechanism related to the given broker. |
71 | * Tasks need to be added to the internal pre-fetched queue. |
72 | * |
73 | * @return void |
74 | */ |
75 | abstract protected function doPop(); |
76 | |
77 | /** |
78 | * Internal mechanism of deleting a message, specific for the given broker |
79 | * |
80 | * @param string $id |
81 | * @param array $logContext |
82 | * @return void |
83 | */ |
84 | abstract protected function doDelete($id, array $logContext = []); |
85 | |
86 | /** |
87 | * @return null|TaskInterface |
88 | */ |
89 | public function pop() |
90 | { |
91 | // if there is item in the pre-fetched queue, let's return that |
92 | if ($message = $this->popPreFetchedMessage()) { |
93 | return $message; |
94 | } |
95 | |
96 | $this->doPop(); |
97 | |
98 | return $this->popPreFetchedMessage(); |
99 | } |
100 | |
101 | /** |
102 | * Pop a task from the internal queue. |
103 | * |
104 | * @return TaskInterface|null |
105 | */ |
106 | private function popPreFetchedMessage() |
107 | { |
108 | if ($this->preFetchedQueue->count()) { |
109 | return $this->preFetchedQueue->dequeue(); |
110 | } |
111 | |
112 | return null; |
113 | } |
114 | |
115 | /** |
116 | * Add a task to the internal queue. |
117 | * |
118 | * @param TaskInterface $task |
119 | */ |
120 | protected function pushPreFetchedMessage(TaskInterface $task) |
121 | { |
122 | $this->preFetchedQueue->enqueue($task); |
123 | } |
124 | |
125 | /** |
126 | * Unserialize the given task JSON. |
127 | * |
128 | * If the json is not valid, it deletes the task straight away without processing it. |
129 | * |
130 | * @param string $taskJSON |
131 | * @param string $idForDeletion An identification of the given task |
132 | * @param array $logContext |
133 | * @return null|TaskInterface |
134 | */ |
135 | protected function unserializeTask($taskJSON, $idForDeletion, array $logContext = []) |
136 | { |
137 | try { |
138 | $basicData = json_decode($taskJSON, true); |
139 | $this->assertValidJson($basicData); |
140 | |
141 | $task = TaskFactory::build($basicData); |
142 | |
143 | if ($task instanceof CallbackTaskInterface && is_string($task->getCallable())) { |
144 | $this->handleCallbackTask($task, $logContext); |
145 | } |
146 | |
147 | return $task; |
148 | } catch (\Exception $e) { |
149 | $this->doDelete($idForDeletion, $logContext); |
150 | |
151 | return null; |
152 | } |
153 | } |
154 | |
155 | /** |
156 | * @param TaskInterface $task |
157 | * @return mixed |
158 | */ |
159 | protected function serializeTask(TaskInterface $task) |
160 | { |
161 | return json_encode($task); |
162 | } |
163 | |
164 | /** |
165 | * @param $basicData |
166 | * @throws \Exception |
167 | */ |
168 | protected function assertValidJson($basicData) |
169 | { |
170 | if ( |
171 | ($basicData !== null |
172 | && json_last_error() === JSON_ERROR_NONE |
173 | && isset($basicData[TaskInterface::JSON_TASK_CLASS_NAME_KEY])) === false |
174 | ) { |
175 | throw new \Exception(); |
176 | } |
177 | } |
178 | |
179 | /** |
180 | * @param CallbackTaskInterface $task |
181 | * @param array $logContext |
182 | * @throws \Exception |
183 | */ |
184 | protected function handleCallbackTask(CallbackTaskInterface $task, array $logContext) |
185 | { |
186 | try { |
187 | $callable = $this->getActionResolver()->resolve($task->getCallable()); |
188 | |
189 | if ($callable instanceof ServiceLocatorAwareInterface) { |
190 | $callable->setServiceLocator($this->getServiceLocator()); |
191 | } |
192 | |
193 | $task->setCallable($callable); |
194 | } catch (ResolutionException $e) { |
195 | $this->logError('Callable/Action class ' . $task->getCallable() . ' does not exist', $logContext); |
196 | |
197 | throw new \Exception(); |
198 | } |
199 | } |
200 | |
201 | /** |
202 | * @return ActionService|ConfigurableService|object |
203 | */ |
204 | protected function getActionResolver() |
205 | { |
206 | return $this->getServiceLocator()->get(ActionService::SERVICE_ID); |
207 | } |
208 | |
209 | /** |
210 | * @param string $name |
211 | * @return $this |
212 | */ |
213 | public function setQueueName($name) |
214 | { |
215 | $this->queueName = $name; |
216 | |
217 | return $this; |
218 | } |
219 | |
220 | /** |
221 | * @return string |
222 | */ |
223 | protected function getQueueName() |
224 | { |
225 | return $this->queueName; |
226 | } |
227 | |
228 | /** |
229 | * @return string |
230 | */ |
231 | protected function getQueueNameWithPrefix() |
232 | { |
233 | return sprintf("%s_%s", QueueDispatcher::QUEUE_PREFIX, $this->getQueueName()); |
234 | } |
235 | |
236 | /** |
237 | * @inheritdoc |
238 | */ |
239 | public function getNumberOfTasksToReceive() |
240 | { |
241 | return abs((int) $this->numberOfTasksToReceive); |
242 | } |
243 | } |