Code Coverage
 
Lines
Functions and Methods
Classes and Traits
Total
3.26% covered (danger)
3.26%
3 / 92
0.00% covered (danger)
0.00%
0 / 16
CRAP
0.00% covered (danger)
0.00%
0 / 1
NewSqlQueueBroker
3.26% covered (danger)
3.26%
3 / 92
0.00% covered (danger)
0.00%
0 / 16
638.00
0.00% covered (danger)
0.00%
0 / 1
 __construct
75.00% covered (warning)
75.00%
3 / 4
0.00% covered (danger)
0.00%
0 / 1
2.06
 __toPhpCode
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
2
 createQueue
0.00% covered (danger)
0.00%
0 / 13
0.00% covered (danger)
0.00%
0 / 1
12
 push
0.00% covered (danger)
0.00%
0 / 6
0.00% covered (danger)
0.00%
0 / 1
2
 delete
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
2
 count
0.00% covered (danger)
0.00%
0 / 10
0.00% covered (danger)
0.00%
0 / 1
6
 doPop
0.00% covered (danger)
0.00%
0 / 11
0.00% covered (danger)
0.00%
0 / 1
12
 doDelete
0.00% covered (danger)
0.00%
0 / 9
0.00% covered (danger)
0.00%
0 / 1
6
 getSchemaProvider
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getQueryBuilder
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 getPersistence
0.00% covered (danger)
0.00%
0 / 5
0.00% covered (danger)
0.00%
0 / 1
6
 getTableName
0.00% covered (danger)
0.00%
0 / 1
0.00% covered (danger)
0.00%
0 / 1
2
 changeMessagesVisibility
0.00% covered (danger)
0.00%
0 / 7
0.00% covered (danger)
0.00%
0 / 1
2
 processMessages
0.00% covered (danger)
0.00%
0 / 4
0.00% covered (danger)
0.00%
0 / 1
12
 fetchVisibleMessages
0.00% covered (danger)
0.00%
0 / 8
0.00% covered (danger)
0.00%
0 / 1
2
 getLogContext
0.00% covered (danger)
0.00%
0 / 3
0.00% covered (danger)
0.00%
0 / 1
2
1<?php
2
3/**
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; under version 2
7 * of the License (non-upgradable).
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
17 *
18 * Copyright (c) 2020 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT);
19 *
20 */
21
22declare(strict_types=1);
23
24namespace oat\taoTaskQueue\model\QueueBroker;
25
26use common_persistence_Manager;
27use common_persistence_SqlPersistence;
28use Doctrine\DBAL\Connection;
29use Doctrine\DBAL\ParameterType;
30use Doctrine\DBAL\Query\QueryBuilder;
31use Doctrine\DBAL\Schema\AbstractSchemaManager;
32use Exception;
33use InvalidArgumentException;
34use oat\generis\Helper\UuidPrimaryKeyTrait;
35use oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker;
36use oat\tao\model\taskQueue\Task\TaskInterface;
37use oat\taoTaskQueue\model\QueueBroker\storage\NewSqlSchema;
38use PDO;
39use Throwable;
40
41/**
42 * Storing messages/tasks in newSQl DB.
43 */
44class NewSqlQueueBroker extends AbstractQueueBroker
45{
46    use UuidPrimaryKeyTrait;
47
48    public const ID = 'newsql';
49
50    /** @var string */
51    private $persistenceId;
52
53    /** @var common_persistence_SqlPersistence */
54    protected $persistence;
55
56    public function __construct(string $persistenceId, int $receiveTasks = 1)
57    {
58        parent::__construct($receiveTasks);
59
60        if (empty($persistenceId)) {
61            throw new InvalidArgumentException("Persistence id needs to be set for " . __CLASS__);
62        }
63
64        $this->persistenceId = $persistenceId;
65    }
66
67    public function __toPhpCode()
68    {
69        return 'new ' . get_called_class() . '('
70            . \common_Utils::toHumanReadablePhpString($this->persistenceId)
71            . ', '
72            . \common_Utils::toHumanReadablePhpString($this->getNumberOfTasksToReceive())
73            . ')';
74    }
75
76    /**
77     * Note: this method can be run multiple times because only the migrate queries
78     * (result of getMigrateSchemaSql) will be run.
79     *
80     * @inheritdoc
81     */
82    public function createQueue(): void
83    {
84        $persistence = $this->getPersistence();
85        /** @var AbstractSchemaManager $schemaManager */
86        $schemaManager = $persistence->getDriver()->getSchemaManager();
87        $schema = $schemaManager->createSchema();
88        $fromSchema = clone $schema;
89        try {
90            $schema->dropTable($this->getTableName());
91        } catch (Throwable $exception) {
92            $this->logDebug('Schema of ' . $this->getTableName() . ' table already up to date.');
93        }
94        // Create the table
95        $schema = $this->getSchemaProvider()
96            ->setQueueName($this->getQueueName())
97            ->getSchema($schema, $this->getTableName());
98        $queries = $persistence->getPlatForm()->getMigrateSchemaSql($fromSchema, $schema);
99        foreach ($queries as $query) {
100            $persistence->exec($query);
101        }
102    }
103
104    /**
105     * Insert a new task into the queue table.
106     */
107    public function push(TaskInterface $task): bool
108    {
109        return (bool)$this->getPersistence()->insert($this->getTableName(), [
110            'id' => $this->getUniquePrimaryKey(),
111            'message' => $this->serializeTask($task),
112            'created_at' => $this->getPersistence()->getPlatForm()->getNowExpression(),
113            'visible' => true,
114        ]);
115    }
116
117    public function delete(TaskInterface $task): void
118    {
119        $this->doDelete($task->getMetadata('NewSqlMessageId'), [
120            'InternalMessageId' => $task->getId(),
121            'NewSqlMessageId' => $task->getMetadata('NewSqlMessageId')
122        ]);
123    }
124
125    public function count(): int
126    {
127        try {
128            return (int)$this->getQueryBuilder()
129                ->select('COUNT(id)')
130                ->from($this->getTableName())
131                ->andWhere('visible = :visible')
132                ->setParameter('visible', true, ParameterType::BOOLEAN)
133                ->execute()
134                ->fetchColumn();
135        } catch (Exception $e) {
136            $this->logError('Counting tasks failed with MSG: ' . $e->getMessage());
137        }
138
139        return 0;
140    }
141
142
143    /**
144     * @inheritDoc
145     */
146    protected function doPop(): void
147    {
148        $this->getPersistence()->getPlatform()->beginTransaction();
149
150        $logContext = $this->getLogContext();
151
152        try {
153            $dbResult = $this->fetchVisibleMessages();
154
155            if ($dbResult) {
156                // set the received messages to invisible for other workers
157                $this->changeMessagesVisibility($dbResult);
158
159                $this->processMessages($dbResult, $logContext);
160            } else {
161                $this->logDebug('No task in the queue.', $logContext);
162            }
163
164            $this->getPersistence()->getPlatform()->commit();
165        } catch (Exception $e) {
166            $this->getPersistence()->getPlatform()->rollBack();
167            $this->logError('Popping tasks failed with MSG: ' . $e->getMessage(), $logContext);
168        }
169    }
170
171    protected function doDelete($id, array $logContext = []): void
172    {
173        try {
174            $this->getQueryBuilder()
175                ->delete($this->getTableName())
176                ->where('id = :id')
177                ->andWhere('visible = :visible')
178                ->setParameter('id', $id)
179                ->setParameter('visible', false, ParameterType::BOOLEAN)
180                ->execute();
181        } catch (Exception $e) {
182            $this->logError('Deleting task failed with MSG: ' . $e->getMessage(), $logContext);
183        }
184    }
185
186    private function getSchemaProvider(): NewSqlSchema
187    {
188        return $this->getServiceLocator()->get(NewSqlSchema::class);
189    }
190
191    private function getQueryBuilder(): QueryBuilder
192    {
193        return $this->getPersistence()->getPlatform()->getQueryBuilder();
194    }
195
196    private function getPersistence(): ?common_persistence_SqlPersistence
197    {
198        if (is_null($this->persistence)) {
199            $this->persistence = $this->getServiceLocator()
200                ->get(common_persistence_Manager::SERVICE_ID)
201                ->getPersistenceById($this->persistenceId);
202        }
203
204        return $this->persistence;
205    }
206
207    private function getTableName(): string
208    {
209        return strtolower($this->getQueueNameWithPrefix());
210    }
211
212    private function changeMessagesVisibility(array $dbResult): void
213    {
214        $qb = $this->getQueryBuilder()
215            ->update($this->getTableName())
216            ->set('visible', ':visible')
217            ->where('id IN (:ids)')
218            ->setParameter('visible', false, ParameterType::BOOLEAN)
219            ->setParameter('ids', array_column($dbResult, 'id'), Connection::PARAM_STR_ARRAY);
220
221        $qb->execute();
222    }
223
224    private function processMessages(array $dbResult, array $logContext): void
225    {
226        foreach ($dbResult as $row) {
227            if ($task = $this->unserializeTask($row['message'], $row['id'], $logContext)) {
228                $task->setMetadata('NewSqlMessageId', $row['id']);
229                $this->pushPreFetchedMessage($task);
230            }
231        }
232    }
233
234    private function fetchVisibleMessages(): array
235    {
236        $qb = $this->getQueryBuilder()
237            ->select('id, message')
238            ->from($this->getTableName())
239            ->where('visible = :visible')
240            ->orderBy('created_at')
241            ->setMaxResults($this->getNumberOfTasksToReceive());
242
243        /**
244         * SELECT ... FOR UPDATE is used for locking
245         */
246        $sql = $qb->getSQL() . ' ' . $this->getPersistence()->getPlatForm()->getWriteLockSQL();
247
248        return $this->getPersistence()->query($sql, ['visible' => true])->fetchAll(PDO::FETCH_ASSOC);
249    }
250
251    private function getLogContext(): array
252    {
253        return [
254            'Queue' => $this->getQueueNameWithPrefix()
255        ];
256    }
257}