Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
0.00% covered (danger)
0.00%
0 / 41
0.00% covered (danger)
0.00%
0 / 15
CRAP
0.00% covered (danger)
0.00%
0 / 1
AbstractQueueBroker
0.00% covered (danger)
0.00%
0 / 41
0.00% covered (danger)
0.00%
0 / 15
600
0.00% covered (danger)
0.00%
0 / 1
 __construct
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 doPop
n/a
0 / 0
n/a
0 / 0
0
 doDelete
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
0
 pop
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
6
 popPreFetchedMessage
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 pushPreFetchedMessage
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 unserializeTask
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
20
 serializeTask
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 assertValidJson
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
20
 handleCallbackTask
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
12
 getActionResolver
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 setQueueName
0.00% covered (danger)
0.00%
0 / 2
0.00% covered (danger)
0.00%
0 / 1
2
 getQueueName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getQueueNameWithPrefix
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfTasksToReceive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\taoTaskQueue\model\QueueBroker;
23
24use oat\oatbox\PhpSerializable;
25use oat\oatbox\service\ConfigurableService;
26use oat\oatbox\action\ActionService;
27use oat\oatbox\action\ResolutionException;
28use oat\oatbox\log\LoggerAwareTrait;
29use oat\taoTaskQueue\model\QueueDispatcher;
30use oat\taoTaskQueue\model\Task\CallbackTaskInterface;
31use oat\taoTaskQueue\model\Task\TaskFactory;
32use oat\taoTaskQueue\model\Task\TaskInterface;
33use Zend\ServiceManager\ServiceLocatorAwareInterface;
34use Zend\ServiceManager\ServiceLocatorAwareTrait;
35
36/**
37 * Class AbstractQueueBroker
38 *
39 * @deprecated Use \oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker
40 *
41 * @author Gyula Szucs <gyula@taotesting.com>
42 */
43abstract class AbstractQueueBroker implements QueueBrokerInterface, PhpSerializable, ServiceLocatorAwareInterface
44{
45    use LoggerAwareTrait;
46    use ServiceLocatorAwareTrait;
47
48    private $numberOfTasksToReceive;
49    private $queueName;
50    private $preFetchedQueue;
51
52    /**
53     * AbstractMessageBroker constructor.
54     *
55     * @param int $receiveTasks Maximum amount of tasks that can be received when polling the queue; Default is 1.
56     */
57    public function __construct($receiveTasks = 1)
58    {
59        $this->numberOfTasksToReceive = $receiveTasks;
60        $this->preFetchedQueue = new \SplQueue();
61    }
62
63    public function __toPhpCode()
64    {
65        return 'new ' . get_called_class()
66            . '(' . \common_Utils::toHumanReadablePhpString($this->numberOfTasksToReceive) . ')';
67    }
68
69    /**
70     * Do the specific pop mechanism related to the given broker.
71     * Tasks need to be added to the internal pre-fetched queue.
72     *
73     * @return void
74     */
75    abstract protected function doPop();
76
77    /**
78     * Internal mechanism of deleting a message, specific for the given broker
79     *
80     * @param string $id
81     * @param array $logContext
82     * @return void
83     */
84    abstract protected function doDelete($id, array $logContext = []);
85
86    /**
87     * @return null|TaskInterface
88     */
89    public function pop()
90    {
91        // if there is item in the pre-fetched queue, let's return that
92        if ($message = $this->popPreFetchedMessage()) {
93            return $message;
94        }
95
96        $this->doPop();
97
98        return $this->popPreFetchedMessage();
99    }
100
101    /**
102     * Pop a task from the internal queue.
103     *
104     * @return TaskInterface|null
105     */
106    private function popPreFetchedMessage()
107    {
108        if ($this->preFetchedQueue->count()) {
109            return $this->preFetchedQueue->dequeue();
110        }
111
112        return null;
113    }
114
115    /**
116     * Add a task to the internal queue.
117     *
118     * @param TaskInterface $task
119     */
120    protected function pushPreFetchedMessage(TaskInterface $task)
121    {
122        $this->preFetchedQueue->enqueue($task);
123    }
124
125    /**
126     * Unserialize the given task JSON.
127     *
128     * If the json is not valid, it deletes the task straight away without processing it.
129     *
130     * @param string $taskJSON
131     * @param string $idForDeletion An identification of the given task
132     * @param array  $logContext
133     * @return null|TaskInterface
134     */
135    protected function unserializeTask($taskJSON, $idForDeletion, array $logContext = [])
136    {
137        try {
138            $basicData = json_decode($taskJSON, true);
139            $this->assertValidJson($basicData);
140
141            $task = TaskFactory::build($basicData);
142
143            if ($task instanceof CallbackTaskInterface && is_string($task->getCallable())) {
144                $this->handleCallbackTask($task, $logContext);
145            }
146
147            return $task;
148        } catch (\Exception $e) {
149            $this->doDelete($idForDeletion, $logContext);
150
151            return null;
152        }
153    }
154
155    /**
156     * @param TaskInterface $task
157     * @return mixed
158     */
159    protected function serializeTask(TaskInterface $task)
160    {
161        return json_encode($task);
162    }
163
164    /**
165     * @param $basicData
166     * @throws \Exception
167     */
168    protected function assertValidJson($basicData)
169    {
170        if (
171            ($basicData !== null
172            && json_last_error() === JSON_ERROR_NONE
173            && isset($basicData[TaskInterface::JSON_TASK_CLASS_NAME_KEY])) === false
174        ) {
175            throw new \Exception();
176        }
177    }
178
179    /**
180     * @param CallbackTaskInterface $task
181     * @param array $logContext
182     * @throws \Exception
183     */
184    protected function handleCallbackTask(CallbackTaskInterface $task, array $logContext)
185    {
186        try {
187            $callable = $this->getActionResolver()->resolve($task->getCallable());
188
189            if ($callable instanceof ServiceLocatorAwareInterface) {
190                $callable->setServiceLocator($this->getServiceLocator());
191            }
192
193            $task->setCallable($callable);
194        } catch (ResolutionException $e) {
195            $this->logError('Callable/Action class ' . $task->getCallable() . ' does not exist', $logContext);
196
197            throw new \Exception();
198        }
199    }
200
201    /**
202     * @return ActionService|ConfigurableService|object
203     */
204    protected function getActionResolver()
205    {
206        return $this->getServiceLocator()->get(ActionService::SERVICE_ID);
207    }
208
209    /**
210     * @param string $name
211     * @return $this
212     */
213    public function setQueueName($name)
214    {
215        $this->queueName = $name;
216
217        return $this;
218    }
219
220    /**
221     * @return string
222     */
223    protected function getQueueName()
224    {
225        return $this->queueName;
226    }
227
228    /**
229     * @return string
230     */
231    protected function getQueueNameWithPrefix()
232    {
233        return sprintf("%s_%s", QueueDispatcher::QUEUE_PREFIX, $this->getQueueName());
234    }
235
236    /**
237     * @inheritdoc
238     */
239    public function getNumberOfTasksToReceive()
240    {
241        return abs((int) $this->numberOfTasksToReceive);
242    }
243}