Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
2.22% covered (danger)
2.22%
3 / 135
0.00% covered (danger)
0.00%
0 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
SqsQueueBroker
2.22% covered (danger)
2.22%
3 / 135
0.00% covered (danger)
0.00%
0 / 13
1535.69
0.00% covered (danger)
0.00%
0 / 1
 __construct
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
2.06
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getClient
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
12
 getCache
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
6
 createQueue
0.00% covered (danger)
0.00%
0 / 18
0.00% covered (danger)
0.00%
0 / 1
20
 push
0.00% covered (danger)
0.00%
0 / 20
0.00% covered (danger)
0.00%
0 / 1
20
 doPop
0.00% covered (danger)
0.00%
0 / 25
0.00% covered (danger)
0.00%
0 / 1
56
 delete
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 12
0.00% covered (danger)
0.00%
0 / 1
12
 count
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
20
 queueExists
0.00% covered (danger)
0.00%
0 / 22
0.00% covered (danger)
0.00%
0 / 1
42
 getUrlCacheKey
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getNumberOfTasksToReceive
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
6
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\taoTaskQueue\model\QueueBroker;
23
24use Aws\Exception\AwsException;
25use Aws\Sqs\SqsClient;
26use oat\awsTools\AwsClient;
27use oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker;
28use oat\tao\model\taskQueue\Task\TaskInterface;
29
30/**
31 * Storing messages/tasks on AWS SQS.
32 *
33 * @author Gyula Szucs <gyula@taotesting.com>
34 */
35class SqsQueueBroker extends AbstractQueueBroker
36{
37    public const DEFAULT_AWS_CLIENT_KEY = 'generis/awsClient';
38    public const ID = 'sqs';
39
40    private $cacheId;
41
42    /**
43     * @var SqsClient
44     */
45    private $client;
46    private $queueUrl;
47
48    /**
49     * @var \common_cache_Cache
50     */
51    private $cache;
52
53    /**
54     * SqsQueueBroker constructor.
55     *
56     * @param string $cacheServiceId
57     * @param int $receiveTasks
58     */
59    public function __construct($cacheServiceId, $receiveTasks = 1)
60    {
61        parent::__construct($receiveTasks);
62
63        if (empty($cacheServiceId)) {
64            throw new \InvalidArgumentException("Cache Service needs to be set for " . __CLASS__);
65        }
66
67        $this->cacheId = $cacheServiceId;
68    }
69
70    public function __toPhpCode()
71    {
72        return 'new ' . get_called_class() . '('
73            . \common_Utils::toHumanReadablePhpString($this->cacheId)
74            . ', '
75            . \common_Utils::toHumanReadablePhpString($this->getNumberOfTasksToReceive())
76            . ')';
77    }
78
79    /**
80     * @return SqsClient
81     */
82    protected function getClient()
83    {
84        if (is_null($this->client)) {
85            if (!$this->getServiceLocator()->has(self::DEFAULT_AWS_CLIENT_KEY)) {
86                throw new \RuntimeException(
87                    'Unable to load driver for ' . __CLASS__
88                        . ', most likely generis/awsClient.conf.php does not exist.'
89                );
90            }
91
92            /** @var AwsClient $awsClient */
93            $awsClient = $this->getServiceLocator()->get(self::DEFAULT_AWS_CLIENT_KEY);
94
95            $this->client = $awsClient->getSqsClient();
96        }
97
98        return $this->client;
99    }
100
101    /**
102     * @return \common_cache_Cache
103     */
104    protected function getCache()
105    {
106        if (is_null($this->cache)) {
107            $this->cache = $this->getServiceLocator()->get($this->cacheId);
108        }
109
110        return $this->cache;
111    }
112
113    /**
114     * Creates queue.
115     */
116    public function createQueue()
117    {
118        try {
119            // Note: we are creating a Standard Queue for the time being.
120            // More development needed to be able to customize it, for example creating FIFO Queue or setting attributes
121            // from outside.
122            /** @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#createqueue */
123            $result = $this->getClient()->createQueue([
124                'QueueName' => $this->getQueueNameWithPrefix(),
125                'Attributes' => [
126                    'DelaySeconds' => 0,
127                    'VisibilityTimeout' => 600
128                ]
129            ]);
130
131            if ($result->hasKey('QueueUrl')) {
132                $this->queueUrl = $result->get('QueueUrl');
133
134                $this->getCache()->put($this->getUrlCacheKey(), $this->queueUrl);
135
136                $this->logDebug('Queue ' . $this->queueUrl . ' created and cached');
137            } else {
138                $this->logError('Queue ' . $this->getQueueNameWithPrefix() . ' not created');
139            }
140        } catch (AwsException $e) {
141            $this->logError(
142                'Creating queue ' . $this->getQueueNameWithPrefix() . ' failed with MSG: ' . $e->getMessage()
143            );
144
145            if (PHP_SAPI == 'cli') {
146                throw $e;
147            }
148        }
149    }
150
151    /**
152     * @param TaskInterface $task
153     * @return bool
154     */
155    public function push(TaskInterface $task)
156    {
157        // ensures that the SQS Queue exist
158        if (!$this->queueExists()) {
159            $this->createQueue();
160        }
161
162        $logContext = [
163            'QueueUrl' => $this->queueUrl,
164            'InternalMessageId' => $task->getId()
165        ];
166
167        try {
168            $result = $this->getClient()->sendMessage([
169                'MessageAttributes' => [],
170                'MessageBody' => $this->serializeTask($task),
171                'QueueUrl' => $this->queueUrl
172            ]);
173
174            if ($result->hasKey('MessageId')) {
175                $this->logDebug('Message pushed to SQS', array_merge($logContext, [
176                    'SqsMessageId' => $result->get('MessageId')
177                ]));
178                return true;
179            } else {
180                $this->logError('Message seems not received by SQS.', $logContext);
181            }
182        } catch (AwsException $e) {
183            $this->logError('Pushing message failed with MSG: ' . $e->getAwsErrorMessage(), $logContext);
184        }
185
186        return false;
187    }
188
189    /**
190     * Does the SQS specific pop mechanism.
191     *
192     * @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#shape-message
193     */
194    protected function doPop()
195    {
196        // ensures that the SQS Queue exist
197        if (!$this->queueExists()) {
198            $this->createQueue();
199        }
200
201        $logContext = [
202            'QueueUrl' => $this->queueUrl
203        ];
204
205        try {
206            $result = $this->getClient()->receiveMessage([
207                'AttributeNames' => [], //nothing
208                'MaxNumberOfMessages' => $this->getNumberOfTasksToReceive(),
209                'MessageAttributeNames' => [], //nothing
210                'QueueUrl' => $this->queueUrl,
211                'WaitTimeSeconds' => 20 //retrieving messages with Long Polling
212            ]);
213
214            if (is_array($result->get('Messages')) && count($result->get('Messages')) > 0) {
215                $this->logDebug('Received ' . count($result->get('Messages')) . ' messages.', $logContext);
216
217                foreach ($result->get('Messages') as $message) {
218                    $task = $this->unserializeTask($message['Body'], $message['ReceiptHandle'], [
219                        'SqsMessageId' => $message['MessageId']
220                    ]);
221
222                    if ($task) {
223                        $task->setMetadata('SqsMessageId', $message['MessageId']);
224                        $task->setMetadata('ReceiptHandle', $message['ReceiptHandle']);
225                        $this->pushPreFetchedMessage($task);
226                    }
227                }
228            } else {
229                $this->logDebug('No messages in queue.', $logContext);
230            }
231        } catch (AwsException $e) {
232            $this->logError('Popping tasks failed with MSG: ' . $e->getAwsErrorMessage(), $logContext);
233        }
234    }
235
236    /**
237     * @param TaskInterface $task
238     */
239    public function delete(TaskInterface $task)
240    {
241        $this->doDelete($task->getMetadata('ReceiptHandle'), [
242            'InternalMessageId' => $task->getId(),
243            'SqsMessageId' => $task->getMetadata('SqsMessageId')
244        ]);
245    }
246
247    /**
248     * Delete a task by its receipt.
249     *
250     * @param string $receipt
251     * @param array $logContext
252     */
253    protected function doDelete($receipt, array $logContext = [])
254    {
255        // ensures that the SQS Queue exist
256        if (!$this->queueExists()) {
257            $this->createQueue();
258        }
259
260        $logContext = array_merge([
261            'QueueUrl' => $this->queueUrl
262        ], $logContext);
263
264        try {
265            $this->getClient()->deleteMessage([
266                'QueueUrl' => $this->queueUrl,
267                'ReceiptHandle' => $receipt
268            ]);
269
270            $this->logDebug('Task deleted from queue.', $logContext);
271        } catch (AwsException $e) {
272            $this->logError('Deleting task failed with MSG: ' . $e->getAwsErrorMessage(), $logContext);
273        }
274    }
275
276    public function count(): int
277    {
278        // ensures that the SQS Queue exist
279        if (!$this->queueExists()) {
280            $this->createQueue();
281        }
282
283        try {
284            $result = $this->getClient()->getQueueAttributes([
285                'QueueUrl' => $this->queueUrl,
286                'AttributeNames' => ['ApproximateNumberOfMessages'],
287            ]);
288
289            if (isset($result['Attributes']['ApproximateNumberOfMessages'])) {
290                return (int) $result['Attributes']['ApproximateNumberOfMessages'];
291            }
292        } catch (AwsException $e) {
293            $this->logError('Counting tasks failed with MSG: ' . $e->getAwsErrorMessage());
294        }
295
296        return 0;
297    }
298
299    /**
300     * Checks if queue exists
301     *
302     * @return bool
303     */
304    protected function queueExists()
305    {
306        if (isset($this->queueUrl)) {
307            return true;
308        }
309
310        if ($this->getCache()->has($this->getUrlCacheKey())) {
311            $this->queueUrl = $this->getCache()->get($this->getUrlCacheKey());
312            return true;
313        }
314
315        try {
316            $result = $this->getClient()->getQueueUrl([
317                'QueueName' => $this->getQueueNameWithPrefix()
318            ]);
319
320            $this->queueUrl = $result->get('QueueUrl');
321
322            if ($result->hasKey('QueueUrl')) {
323                $this->queueUrl = $result->get('QueueUrl');
324            } else {
325                $this->logError('Queue url for' . $this->getQueueNameWithPrefix() . ' not fetched');
326            }
327
328            if ($this->queueUrl !== null) {
329                $this->getCache()->put($this->queueUrl, $this->getUrlCacheKey());
330                $this->logDebug('Queue url ' . $this->queueUrl . ' fetched and cached');
331                return true;
332            }
333        } catch (AwsException $e) {
334            $this->logWarning(
335                'Fetching queue url for ' . $this->getQueueNameWithPrefix() . ' failed. MSG: '
336                    . $e->getAwsErrorMessage()
337            );
338        }
339
340        return false;
341    }
342
343    /**
344     * @return string
345     */
346    private function getUrlCacheKey()
347    {
348        return $this->getQueueNameWithPrefix() . '_url';
349    }
350
351    /**
352     * SQS can return max 10 messages at once.
353     *
354     * @return int
355     */
356    public function getNumberOfTasksToReceive()
357    {
358        return parent::getNumberOfTasksToReceive() > 10 ? 10 : parent::getNumberOfTasksToReceive();
359    }
360}