Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
2.54% covered (danger)
2.54%
3 / 118
0.00% covered (danger)
0.00%
0 / 13
CRAP
0.00% covered (danger)
0.00%
0 / 1
RdsQueueBroker
2.54% covered (danger)
2.54%
3 / 118
0.00% covered (danger)
0.00%
0 / 13
701.80
0.00% covered (danger)
0.00%
0 / 1
 __construct
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
2.06
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 getPersistence
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 getTableName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 createQueue
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
30
 push
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 doPop
0.00% covered (danger)
0.00%
0 / 27
0.00% covered (danger)
0.00%
0 / 1
30
 delete
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 doDelete
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 getTaskByTaskLogId
0.00% covered (danger)
0.00%
0 / 21
0.00% covered (danger)
0.00%
0 / 1
12
 changeTaskVisibility
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 count
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 getQueryBuilder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2017-2021 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22namespace oat\taoTaskQueue\model\QueueBroker;
23
24use Doctrine\DBAL\FetchMode;
25use Doctrine\DBAL\Query\QueryBuilder;
26use Doctrine\DBAL\Schema\SchemaException;
27use Doctrine\DBAL\Schema\Schema;
28use Doctrine\DBAL\Schema\AbstractSchemaManager;
29use oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker;
30use oat\tao\model\taskQueue\Task\TaskInterface;
31use oat\taoTaskQueue\model\Task\CallbackTaskDecorator;
32
33/**
34 * Storing messages/tasks in DB.
35 *
36 * @author Gyula Szucs <gyula@taotesting.com>
37 */
38class RdsQueueBroker extends AbstractQueueBroker
39{
40    public const ID = 'rds';
41
42    private $persistenceId;
43
44    /**
45     * @var \common_persistence_SqlPersistence
46     */
47    protected $persistence;
48
49    /**
50     * RdsQueueBroker constructor.
51     *
52     * @param string $persistenceId
53     * @param int $receiveTasks
54     */
55    public function __construct($persistenceId, $receiveTasks = 1)
56    {
57        parent::__construct($receiveTasks);
58
59        if (empty($persistenceId)) {
60            throw new \InvalidArgumentException("Persistence id needs to be set for " . __CLASS__);
61        }
62
63        $this->persistenceId = $persistenceId;
64    }
65
66    public function __toPhpCode()
67    {
68        return 'new ' . get_called_class() . '('
69            . \common_Utils::toHumanReadablePhpString($this->persistenceId)
70            . ', '
71            . \common_Utils::toHumanReadablePhpString($this->getNumberOfTasksToReceive())
72            . ')';
73    }
74
75    /**
76     * @return \common_persistence_SqlPersistence
77     */
78    protected function getPersistence()
79    {
80        if (is_null($this->persistence)) {
81            $this->persistence = $this->getServiceLocator()
82                ->get(\common_persistence_Manager::SERVICE_ID)
83                ->getPersistenceById($this->persistenceId);
84        }
85
86        return $this->persistence;
87    }
88
89    /**
90     * @return string
91     */
92    protected function getTableName()
93    {
94        return strtolower($this->getQueueNameWithPrefix());
95    }
96
97    /**
98     * Note: this method can be run multiple times because only the migrate queries (result of getMigrateSchemaSql)
99     * will be run.
100     *
101     * @inheritdoc
102     */
103    public function createQueue()
104    {
105        $persistence = $this->getPersistence();
106        /** @var AbstractSchemaManager $schemaManager */
107        $schemaManager = $persistence->getDriver()->getSchemaManager();
108
109        /** @var Schema $schema */
110        $schema = $schemaManager->createSchema();
111        $fromSchema = clone $schema;
112        try {
113            if (in_array($this->getTableName(), $schemaManager->getTables())) {
114                $schema->dropTable($this->getTableName());
115            }
116            $table = $schema->createTable($this->getTableName());
117            $table->addOption('engine', 'InnoDB');
118            $table->addColumn('id', 'integer', ["autoincrement" => true, "notnull" => true, "unsigned" => true]);
119            $table->addColumn('message', 'text', ["notnull" => true]);
120            $table->addColumn('visible', 'boolean', ["default" => 1]);
121            $table->addColumn('created_at', 'datetime', ['notnull' => true]);
122            $table->setPrimaryKey(['id']);
123            $table->addIndex(['created_at', 'visible'], 'IDX_created_at_visible_' . $this->getQueueName());
124        } catch (SchemaException $e) {
125            $this->logDebug('Schema of ' . $this->getTableName() . ' table already up to date.');
126        }
127
128        $queries = $persistence->getPlatForm()->getMigrateSchemaSql($fromSchema, $schema);
129
130        foreach ($queries as $query) {
131            $persistence->exec($query);
132        }
133
134        if ($queries) {
135            $this->logDebug('Queue ' . $this->getTableName() . ' created/updated in RDS.');
136        }
137    }
138
139    /**
140     * Insert a new task into the queue table.
141     *
142     * @param TaskInterface $task
143     * @return bool
144     */
145    public function push(TaskInterface $task)
146    {
147        return (bool) $this->getPersistence()->insert($this->getTableName(), [
148            'message' => $this->serializeTask($task),
149            'created_at' => $this->getPersistence()->getPlatForm()->getNowExpression()
150        ]);
151    }
152
153    /**
154     * Does the DBAL specific pop mechanism.
155     */
156    protected function doPop()
157    {
158        $this->getPersistence()->getPlatform()->beginTransaction();
159
160        $logContext = [
161            'Queue' => $this->getQueueNameWithPrefix()
162        ];
163
164        try {
165            $qb = $this->getQueryBuilder()
166                ->select('id, message')
167                ->from($this->getTableName())
168                ->andWhere('visible = :visible')
169                ->orderBy('created_at')
170                ->setMaxResults($this->getNumberOfTasksToReceive());
171
172            /**
173             * SELECT ... FOR UPDATE is used for locking
174             *
175             * @see https://dev.mysql.com/doc/refman/5.6/en/innodb-locking-reads.html
176             */
177            $sql = $qb->getSQL() . ' ' . $this->getPersistence()->getPlatForm()->getWriteLockSQL();
178
179            if ($dbResult = $this->getPersistence()->query($sql, ['visible' => 1])->fetchAll(\PDO::FETCH_ASSOC)) {
180                // set the received messages to invisible for other workers
181                $qb = $this->getQueryBuilder()
182                    ->update($this->getTableName())
183                    ->set('visible', ':visible')
184                    ->where('id IN (' . implode(',', array_column($dbResult, 'id')) . ')')
185                    ->setParameter('visible', 0);
186
187                $qb->execute();
188
189                foreach ($dbResult as $row) {
190                    if ($task = $this->unserializeTask($row['message'], $row['id'], $logContext)) {
191                        $task->setMetadata('RdsMessageId', $row['id']);
192                        $this->pushPreFetchedMessage($task);
193                    }
194                }
195            } else {
196                $this->logDebug('No task in the queue.', $logContext);
197            }
198
199            $this->getPersistence()->getPlatform()->commit();
200        } catch (\Exception $e) {
201            $this->getPersistence()->getPlatform()->rollBack();
202            $this->logError('Popping tasks failed with MSG: ' . $e->getMessage(), $logContext);
203        }
204    }
205
206    /**
207     * Delete the message after being processed by the worker.
208     *
209     * @param TaskInterface $task
210     */
211    public function delete(TaskInterface $task)
212    {
213        $this->doDelete($task->getMetadata('RdsMessageId'), [
214            'InternalMessageId' => $task->getId(),
215            'RdsMessageId' => $task->getMetadata('RdsMessageId')
216        ]);
217    }
218
219    /**
220     * @param string $id
221     * @param array  $logContext
222     * @return int
223     */
224    protected function doDelete($id, array $logContext = [])
225    {
226        try {
227            $this->getQueryBuilder()
228                ->delete($this->getTableName())
229                ->where('id = :id')
230                ->andWhere('visible = :visible')
231                ->setParameter('id', (int) $id)
232                ->setParameter('visible', 0)
233                ->execute();
234        } catch (\Exception $e) {
235            $this->logError('Deleting task failed with MSG: ' . $e->getMessage(), $logContext);
236        }
237    }
238
239    /**
240     * @TODO Make queue broker open/closed: https://oat-sa.atlassian.net/browse/ADF-556
241     */
242    public function getTaskByTaskLogId(string $taskLogId): ?CallbackTaskDecorator
243    {
244        $logId = substr($taskLogId, strpos($taskLogId, '#'));
245
246        $row = $this->getQueryBuilder()
247            ->select('id, message, visible, created_at')
248            ->from($this->getTableName())
249            ->andWhere('message LIKE :taskLogId')
250            ->setParameter('taskLogId', "%$logId%")
251            ->setMaxResults(1)
252            ->execute()
253            ->fetch(FetchMode::ASSOCIATIVE);
254
255        if (!$row) {
256            return null;
257        }
258
259        $task = $this->unserializeTask(
260            $row['message'],
261            $row['id'],
262            [
263                'Queue' => $this->getQueueNameWithPrefix()
264            ]
265        );
266
267        if (!$task) {
268            return null;
269        }
270
271        return new CallbackTaskDecorator($task, $row['id']);
272    }
273
274    /**
275     * @TODO Make queue broker open/closed: https://oat-sa.atlassian.net/browse/ADF-556
276     */
277    public function changeTaskVisibility(string $taskId, bool $visible): void
278    {
279        $this->getQueryBuilder()
280            ->update($this->getTableName())
281            ->set('visible', ':visible')
282            ->where('id = :id')
283            ->setParameter('visible', (int)$visible)
284            ->setParameter('id', $taskId)
285            ->execute();
286    }
287
288    public function count(): int
289    {
290        try {
291            $qb = $this->getQueryBuilder()
292                ->select('COUNT(id)')
293                ->from($this->getTableName())
294                ->andWhere('visible = :visible')
295                ->setParameter('visible', 1);
296
297            return (int) $qb->execute()->fetchColumn();
298        } catch (\Exception $e) {
299            $this->logError('Counting tasks failed with MSG: ' . $e->getMessage());
300        }
301
302        return 0;
303    }
304
305    /**
306     * @return QueryBuilder
307     */
308    private function getQueryBuilder()
309    {
310        return $this->getPersistence()->getPlatform()->getQueryBuilder();
311    }
312}