Code Coverage |
||||||||||
Lines |
Functions and Methods |
Classes and Traits |
||||||||
Total | |
3.26% |
3 / 92 |
|
0.00% |
0 / 16 |
CRAP | |
0.00% |
0 / 1 |
NewSqlQueueBroker | |
3.26% |
3 / 92 |
|
0.00% |
0 / 16 |
638.00 | |
0.00% |
0 / 1 |
__construct | |
75.00% |
3 / 4 |
|
0.00% |
0 / 1 |
2.06 | |||
__toPhpCode | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
2 | |||
createQueue | |
0.00% |
0 / 13 |
|
0.00% |
0 / 1 |
12 | |||
push | |
0.00% |
0 / 6 |
|
0.00% |
0 / 1 |
2 | |||
delete | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
2 | |||
count | |
0.00% |
0 / 10 |
|
0.00% |
0 / 1 |
6 | |||
doPop | |
0.00% |
0 / 11 |
|
0.00% |
0 / 1 |
12 | |||
doDelete | |
0.00% |
0 / 9 |
|
0.00% |
0 / 1 |
6 | |||
getSchemaProvider | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getQueryBuilder | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
getPersistence | |
0.00% |
0 / 5 |
|
0.00% |
0 / 1 |
6 | |||
getTableName | |
0.00% |
0 / 1 |
|
0.00% |
0 / 1 |
2 | |||
changeMessagesVisibility | |
0.00% |
0 / 7 |
|
0.00% |
0 / 1 |
2 | |||
processMessages | |
0.00% |
0 / 4 |
|
0.00% |
0 / 1 |
12 | |||
fetchVisibleMessages | |
0.00% |
0 / 8 |
|
0.00% |
0 / 1 |
2 | |||
getLogContext | |
0.00% |
0 / 3 |
|
0.00% |
0 / 1 |
2 |
1 | <?php |
2 | |
3 | /** |
4 | * This program is free software; you can redistribute it and/or |
5 | * modify it under the terms of the GNU General Public License |
6 | * as published by the Free Software Foundation; under version 2 |
7 | * of the License (non-upgradable). |
8 | * |
9 | * This program is distributed in the hope that it will be useful, |
10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
12 | * GNU General Public License for more details. |
13 | * |
14 | * You should have received a copy of the GNU General Public License |
15 | * along with this program; if not, write to the Free Software |
16 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. |
17 | * |
18 | * Copyright (c) 2020 (original work) Open Assessment Technologies SA (under the project TAO-PRODUCT); |
19 | * |
20 | */ |
21 | |
22 | declare(strict_types=1); |
23 | |
24 | namespace oat\taoTaskQueue\model\QueueBroker; |
25 | |
26 | use common_persistence_Manager; |
27 | use common_persistence_SqlPersistence; |
28 | use Doctrine\DBAL\Connection; |
29 | use Doctrine\DBAL\ParameterType; |
30 | use Doctrine\DBAL\Query\QueryBuilder; |
31 | use Doctrine\DBAL\Schema\AbstractSchemaManager; |
32 | use Exception; |
33 | use InvalidArgumentException; |
34 | use oat\generis\Helper\UuidPrimaryKeyTrait; |
35 | use oat\tao\model\taskQueue\Queue\Broker\AbstractQueueBroker; |
36 | use oat\tao\model\taskQueue\Task\TaskInterface; |
37 | use oat\taoTaskQueue\model\QueueBroker\storage\NewSqlSchema; |
38 | use PDO; |
39 | use Throwable; |
40 | |
41 | /** |
42 | * Storing messages/tasks in newSQl DB. |
43 | */ |
44 | class NewSqlQueueBroker extends AbstractQueueBroker |
45 | { |
46 | use UuidPrimaryKeyTrait; |
47 | |
48 | public const ID = 'newsql'; |
49 | |
50 | /** @var string */ |
51 | private $persistenceId; |
52 | |
53 | /** @var common_persistence_SqlPersistence */ |
54 | protected $persistence; |
55 | |
56 | public function __construct(string $persistenceId, int $receiveTasks = 1) |
57 | { |
58 | parent::__construct($receiveTasks); |
59 | |
60 | if (empty($persistenceId)) { |
61 | throw new InvalidArgumentException("Persistence id needs to be set for " . __CLASS__); |
62 | } |
63 | |
64 | $this->persistenceId = $persistenceId; |
65 | } |
66 | |
67 | public function __toPhpCode() |
68 | { |
69 | return 'new ' . get_called_class() . '(' |
70 | . \common_Utils::toHumanReadablePhpString($this->persistenceId) |
71 | . ', ' |
72 | . \common_Utils::toHumanReadablePhpString($this->getNumberOfTasksToReceive()) |
73 | . ')'; |
74 | } |
75 | |
76 | /** |
77 | * Note: this method can be run multiple times because only the migrate queries |
78 | * (result of getMigrateSchemaSql) will be run. |
79 | * |
80 | * @inheritdoc |
81 | */ |
82 | public function createQueue(): void |
83 | { |
84 | $persistence = $this->getPersistence(); |
85 | /** @var AbstractSchemaManager $schemaManager */ |
86 | $schemaManager = $persistence->getDriver()->getSchemaManager(); |
87 | $schema = $schemaManager->createSchema(); |
88 | $fromSchema = clone $schema; |
89 | try { |
90 | $schema->dropTable($this->getTableName()); |
91 | } catch (Throwable $exception) { |
92 | $this->logDebug('Schema of ' . $this->getTableName() . ' table already up to date.'); |
93 | } |
94 | // Create the table |
95 | $schema = $this->getSchemaProvider() |
96 | ->setQueueName($this->getQueueName()) |
97 | ->getSchema($schema, $this->getTableName()); |
98 | $queries = $persistence->getPlatForm()->getMigrateSchemaSql($fromSchema, $schema); |
99 | foreach ($queries as $query) { |
100 | $persistence->exec($query); |
101 | } |
102 | } |
103 | |
104 | /** |
105 | * Insert a new task into the queue table. |
106 | */ |
107 | public function push(TaskInterface $task): bool |
108 | { |
109 | return (bool)$this->getPersistence()->insert($this->getTableName(), [ |
110 | 'id' => $this->getUniquePrimaryKey(), |
111 | 'message' => $this->serializeTask($task), |
112 | 'created_at' => $this->getPersistence()->getPlatForm()->getNowExpression(), |
113 | 'visible' => true, |
114 | ]); |
115 | } |
116 | |
117 | public function delete(TaskInterface $task): void |
118 | { |
119 | $this->doDelete($task->getMetadata('NewSqlMessageId'), [ |
120 | 'InternalMessageId' => $task->getId(), |
121 | 'NewSqlMessageId' => $task->getMetadata('NewSqlMessageId') |
122 | ]); |
123 | } |
124 | |
125 | public function count(): int |
126 | { |
127 | try { |
128 | return (int)$this->getQueryBuilder() |
129 | ->select('COUNT(id)') |
130 | ->from($this->getTableName()) |
131 | ->andWhere('visible = :visible') |
132 | ->setParameter('visible', true, ParameterType::BOOLEAN) |
133 | ->execute() |
134 | ->fetchColumn(); |
135 | } catch (Exception $e) { |
136 | $this->logError('Counting tasks failed with MSG: ' . $e->getMessage()); |
137 | } |
138 | |
139 | return 0; |
140 | } |
141 | |
142 | |
143 | /** |
144 | * @inheritDoc |
145 | */ |
146 | protected function doPop(): void |
147 | { |
148 | $this->getPersistence()->getPlatform()->beginTransaction(); |
149 | |
150 | $logContext = $this->getLogContext(); |
151 | |
152 | try { |
153 | $dbResult = $this->fetchVisibleMessages(); |
154 | |
155 | if ($dbResult) { |
156 | // set the received messages to invisible for other workers |
157 | $this->changeMessagesVisibility($dbResult); |
158 | |
159 | $this->processMessages($dbResult, $logContext); |
160 | } else { |
161 | $this->logDebug('No task in the queue.', $logContext); |
162 | } |
163 | |
164 | $this->getPersistence()->getPlatform()->commit(); |
165 | } catch (Exception $e) { |
166 | $this->getPersistence()->getPlatform()->rollBack(); |
167 | $this->logError('Popping tasks failed with MSG: ' . $e->getMessage(), $logContext); |
168 | } |
169 | } |
170 | |
171 | protected function doDelete($id, array $logContext = []): void |
172 | { |
173 | try { |
174 | $this->getQueryBuilder() |
175 | ->delete($this->getTableName()) |
176 | ->where('id = :id') |
177 | ->andWhere('visible = :visible') |
178 | ->setParameter('id', $id) |
179 | ->setParameter('visible', false, ParameterType::BOOLEAN) |
180 | ->execute(); |
181 | } catch (Exception $e) { |
182 | $this->logError('Deleting task failed with MSG: ' . $e->getMessage(), $logContext); |
183 | } |
184 | } |
185 | |
186 | private function getSchemaProvider(): NewSqlSchema |
187 | { |
188 | return $this->getServiceLocator()->get(NewSqlSchema::class); |
189 | } |
190 | |
191 | private function getQueryBuilder(): QueryBuilder |
192 | { |
193 | return $this->getPersistence()->getPlatform()->getQueryBuilder(); |
194 | } |
195 | |
196 | private function getPersistence(): ?common_persistence_SqlPersistence |
197 | { |
198 | if (is_null($this->persistence)) { |
199 | $this->persistence = $this->getServiceLocator() |
200 | ->get(common_persistence_Manager::SERVICE_ID) |
201 | ->getPersistenceById($this->persistenceId); |
202 | } |
203 | |
204 | return $this->persistence; |
205 | } |
206 | |
207 | private function getTableName(): string |
208 | { |
209 | return strtolower($this->getQueueNameWithPrefix()); |
210 | } |
211 | |
212 | private function changeMessagesVisibility(array $dbResult): void |
213 | { |
214 | $qb = $this->getQueryBuilder() |
215 | ->update($this->getTableName()) |
216 | ->set('visible', ':visible') |
217 | ->where('id IN (:ids)') |
218 | ->setParameter('visible', false, ParameterType::BOOLEAN) |
219 | ->setParameter('ids', array_column($dbResult, 'id'), Connection::PARAM_STR_ARRAY); |
220 | |
221 | $qb->execute(); |
222 | } |
223 | |
224 | private function processMessages(array $dbResult, array $logContext): void |
225 | { |
226 | foreach ($dbResult as $row) { |
227 | if ($task = $this->unserializeTask($row['message'], $row['id'], $logContext)) { |
228 | $task->setMetadata('NewSqlMessageId', $row['id']); |
229 | $this->pushPreFetchedMessage($task); |
230 | } |
231 | } |
232 | } |
233 | |
234 | private function fetchVisibleMessages(): array |
235 | { |
236 | $qb = $this->getQueryBuilder() |
237 | ->select('id, message') |
238 | ->from($this->getTableName()) |
239 | ->where('visible = :visible') |
240 | ->orderBy('created_at') |
241 | ->setMaxResults($this->getNumberOfTasksToReceive()); |
242 | |
243 | /** |
244 | * SELECT ... FOR UPDATE is used for locking |
245 | */ |
246 | $sql = $qb->getSQL() . ' ' . $this->getPersistence()->getPlatForm()->getWriteLockSQL(); |
247 | |
248 | return $this->getPersistence()->query($sql, ['visible' => true])->fetchAll(PDO::FETCH_ASSOC); |
249 | } |
250 | |
251 | private function getLogContext(): array |
252 | { |
253 | return [ |
254 | 'Queue' => $this->getQueueNameWithPrefix() |
255 | ]; |
256 | } |
257 | } |